<a href="https://colab.research.google.com/github/morganFitzg/fashion_annotation/blob/main/Fashion_Annotation_Data_Pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Capstone: IG Data Collection and Storage

Morgan Fitzgerald 

Instructor: Abhineet Kulkarni

This notebook is the second of two (link XX) for a project that annotates Fashion images from Instagram.  The first notebook laid out the business problem and built the model used to classify images.  

The model takes a fashion image and classifies which category of clothing, color, and pattern is shown.

Here the pipeline for data collection, annotation, and storage is laid out.  This includes calls to the Apify Instagram API, data cleaning, model predictions with collected images, and storing results in a MySQL database on Google Cloud.

The MySQL database is connected to a Google Data Studio report at XXX that displays results.

Ultimately all the functions in this notebook (except the Create Database section) feed into **get_and_predict**, which runs the entire pipeline.  

# Setup

In [1]:
!pip install demoji
!pip install apify-client
!pip install mysql
!pip install mysql.connector

Collecting demoji
  Downloading demoji-1.1.0-py3-none-any.whl (42 kB)
[?25l[K     |███████▋                        | 10 kB 15.1 MB/s eta 0:00:01[K     |███████████████▎                | 20 kB 12.1 MB/s eta 0:00:01[K     |███████████████████████         | 30 kB 8.8 MB/s eta 0:00:01[K     |██████████████████████████████▋ | 40 kB 3.5 MB/s eta 0:00:01[K     |████████████████████████████████| 42 kB 966 kB/s 
[?25hInstalling collected packages: demoji
Successfully installed demoji-1.1.0
Collecting apify-client
  Downloading apify_client-0.5.0-py3-none-any.whl (55 kB)
[K     |████████████████████████████████| 55 kB 2.0 MB/s 
[?25hCollecting requests~=2.26.0
  Downloading requests-2.26.0-py2.py3-none-any.whl (62 kB)
[K     |████████████████████████████████| 62 kB 754 kB/s 
Installing collected packages: requests, apify-client
  Attempting uninstall: requests
    Found existing installation: requests 2.23.0
    Uninstalling requests-2.23.0:
      Successfully uninstalled requests

In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import requests
from PIL import Image
import io
import sqlite3
import demoji
from apify_client import ApifyClient
import mysql.connector
from mysql.connector.constants import ClientFlag

from tensorflow.keras.models import Model
from keras.models import Model
from keras.applications.vgg16 import VGG16
from keras.applications.vgg16 import preprocess_input
from keras.preprocessing import image
from keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.utils import Sequence, to_categorical
from keras.losses import CategoricalCrossentropy

from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.metrics import confusion_matrix, accuracy_score
import io 
from PIL import Image
import pickle

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [7]:
config= {
    'user': 'root',
    'password': 'IGphotos',
    'host': '35.238.73.95',
    'client_flags': [ClientFlag.SSL],
    'ssl_ca': '/content/drive/My Drive/fashion_annotation/server-ca.pem',
    'ssl_cert': '/content/drive/My Drive/fashion_annotation/client-cert.pem',
    'ssl_key': '/content/drive/My Drive/fashion_annotation/client-key.pem',
    'database': 'fashion'
}


# API Call Function

The three functions below collect data from the Apify API by querying it with hashtags related to fashion, downloading image from the provided url, and parsing the data into a format compatible with MySQL records. 

In [4]:
#fn that downloads image from display URL

def download_im(url):
    try:
        resp=requests.get(url,stream=True)
        im=resp.content
        byte_im=io.BytesIO(im)
        img=Image.open(byte_im)
        return im
    except Exception as e:
        return None

In [5]:
# queries API for posts with listed hashtags
# num_results is the number of posts returned per hash tag
# returns the response data in json format

def make_queries(hashtag_list,num_results):
    apify_client = ApifyClient('apify_api_5KMZfJ9PIpDUrR9QFwr8KnSKAxUuxJ0KgvoN')

    hash_call={"hashtags": hashtag_list,
               "resultsLimit": num_results
              }
    # Start an actor and waits for it to finish
    actor_call = apify_client.actor('zuzka/instagram-hashtag-scraper').call(run_input=hash_call)

    # Fetch results from the actor's default dataset
    dataset_items = apify_client.dataset(actor_call['defaultDatasetId']).list_items().items
    
    return dataset_items
    

In [6]:
#converts the response data to a list of records for each post
#these records will be extended in another fn to include model results and 
#then inserted into the metadata table

def get_meta_records(results):
    records=[]
    for resp in results:
        resp_type=resp['type']
        if (resp_type =='Image') or (resp_type=='Sidecar'):
            
            url=resp['displayUrl']
            img=download_im(url)
        
            caption=resp['caption']
            caption=demoji.replace_with_desc(caption)
            if len(caption)>1000:
                caption=caption[:1000]
                
            hashtags=" ".join(resp['hashtags'])
            hashtags=demoji.replace_with_desc(hashtags)
            if len(hashtags)>1000:
                hashtags=hashtags[:1000]
        
            timestamp=resp['timestamp']
            year=timestamp[0:4]
            month=timestamp[5:7]
            day=timestamp[8:10]

            data=(resp['queryTag'], 'None',
                hashtags, resp['likesCount'],
                resp['ownerId'], year,
                month, day, img, resp['displayUrl'])
            records.append(data)
    return records

# Create Database

In this section, the database tables are created.
<ol>
<li>metadata - contains data about the post and as well as annotations</li>
<li>post_counts - contains the number of total posts collected each day</li>
<li>color_counts - contains the number of posts for each color on each day as well as the percent of total posts for each color</li>
<li>pattern_counts - the same as color_counts but for patterns</li>
<li>category_counts - the same as color_counts but for categories</li>
</ol>

In [None]:
conn = mysql.connector.connect(**config)
cursor=conn.cursor()

query= """
CREATE TABLE metadata (
                                id INTEGER NOT NULL AUTO_INCREMENT,
                                queryTag VARCHAR(50),
                                caption VARCHAR(1000),
                                hashtags VARCHAR(1000),
                                likesCount INTEGER,
                                ownerId BIGINT,
                                year INTEGER,
                                month INTEGER,
                                day INTEGER,
                                image MEDIUMBLOB,
                                PRIMARY KEY (id))   

"""
cursor.execute(query)
conn.commit()

In [None]:
query= """
INSERT INTO metadata (queryTag, caption, hashtags, likesCount,
                    ownerId, year, month, day, image) 
                    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s);

"""
cursor.executemany(query,records)
conn.commit()

In [54]:
conn.close()

In [None]:
query="""
ALTER TABLE metadata
ADD category VARCHAR(100),
ADD pattern VARCHAR(50),
ADD color VARCHAR(20);
"""
cursor.execute(query)
conn.commit()
conn.close()

In [70]:
conn = mysql.connector.connect(**config)
cursor=conn.cursor()

query="""
ALTER TABLE metadata
ADD displayUrl VARCHAR(500);
"""
cursor.execute(query)
conn.commit()
conn.close()

In [86]:
conn = mysql.connector.connect(**config)
cursor=conn.cursor()

In [None]:
query= """
CREATE TABLE post_counts (
                                month INTEGER NOT NULL,
                                day INTEGER NOT NULL,
                                post_count INTEGER,
                                PRIMARY KEY (month, day));   

"""
cursor.execute(query)
conn.commit()

In [None]:
query= """
CREATE TABLE color_counts (
                                month INTEGER NOT NULL,
                                day INTEGER NOT NULL,
                                color VARCHAR(20),
                                color_count INTEGER,
                                post_count INTEGER,
                                color_percent FLOAT,
                                PRIMARY KEY (month, day, color));   

"""
cursor.execute(query)
conn.commit()

In [87]:
query= """
CREATE TABLE pattern_counts (
                                month INTEGER NOT NULL,
                                day INTEGER NOT NULL,
                                pattern VARCHAR(50),
                                pattern_count INTEGER,
                                post_count INTEGER,
                                pattern_percent FLOAT,
                                PRIMARY KEY (month, day, pattern));   

"""
cursor.execute(query)
conn.commit()

In [88]:
query= """
CREATE TABLE category_counts (
                                month INTEGER NOT NULL,
                                day INTEGER NOT NULL,
                                category VARCHAR(100),
                                category_count INTEGER,
                                post_count INTEGER,
                                category_percent FLOAT,
                                PRIMARY KEY (month, day, category));   

"""
cursor.execute(query)
conn.commit()

# Aggregate Records

The functions in this section aggregate the count of different categories from each label on a daily basis and update their respective tables from the database.

In [8]:
def update_post_counts():
  conn = mysql.connector.connect(**config)
  cursor=conn.cursor()

  query="""
  SELECT month, day, COUNT(color) AS count
  FROM metadata
  GROUP BY day, month;
  """
  cursor.execute(query)
  result=cursor.fetchall()

  query= """
  REPLACE INTO post_counts (month, day, post_count) 
                      VALUES (%s, %s, %s);

  """
  cursor.executemany(query,result)
  conn.commit()
  conn.close()

In [9]:
def update_color_counts():
  conn = mysql.connector.connect(**config)
  cursor=conn.cursor()

  query="""
  SELECT m.month, m.day, m.color, COUNT(m.color) AS count,
  p.post_count, (COUNT(m.color)/p.post_count)*100 AS percent
  FROM metadata m
  JOIN post_counts p ON m.day=p.day AND m.month=p.month
  GROUP BY day, month, color;
  """
  cursor.execute(query)
  color_results=cursor.fetchall()

  query= """
  REPLACE INTO color_counts (month, day, color, color_count,
                      post_count, color_percent) 
                      VALUES (%s, %s, %s, %s, %s, %s);

  """
  cursor.executemany(query,color_results)
  conn.commit()
  conn.close()

In [10]:
def update_pattern_counts():
  conn = mysql.connector.connect(**config)
  cursor=conn.cursor()

  query="""
  SELECT m.month, m.day, m.pattern, COUNT(m.pattern) AS count,
  p.post_count, (COUNT(m.pattern)/p.post_count)*100 AS percent
  FROM metadata m
  JOIN post_counts p ON m.day=p.day AND m.month=p.month
  GROUP BY day, month, pattern;
  """
  cursor.execute(query)
  pattern_results=cursor.fetchall()

  query= """
  REPLACE INTO pattern_counts (month, day, pattern, pattern_count,
                      post_count, pattern_percent) 
                      VALUES (%s, %s, %s, %s, %s, %s);

  """
  cursor.executemany(query,pattern_results)
  conn.commit()
  conn.close()

In [11]:
def update_category_counts():
  conn = mysql.connector.connect(**config)
  cursor=conn.cursor()

  query="""
  SELECT m.month, m.day, m.category, COUNT(m.category) AS count,
  p.post_count, (COUNT(m.category)/p.post_count)*100 AS percent
  FROM metadata m
  JOIN post_counts p ON m.day=p.day AND m.month=p.month
  GROUP BY day, month, category;
  """
  cursor.execute(query)
  category_results=cursor.fetchall()

  query= """
  REPLACE INTO category_counts (month, day, category, category_count,
                      post_count, category_percent) 
                      VALUES (%s, %s, %s, %s, %s, %s);

  """
  cursor.executemany(query,category_results)
  conn.commit()
  conn.close()

# Prediction Pipeline

In this section, functions are defined that query and return records, process images, make predictions, and then upload the data to the MySQL database.

In [4]:
#download the key for the integer labels so that they can be converted to text
pattern_key=pd.read_csv('/content/drive/My Drive/fashion_annotation/pattern_key.csv')
category_key=pd.read_csv('/content/drive/My Drive/fashion_annotation/category_key.csv')
color_key=pd.read_csv('/content/drive/My Drive/fashion_annotation/color_key.csv')

In [5]:
#get dictionary for each key
color_key=color_key.set_index('col_codes').to_dict()['color']
pattern_key=pattern_key.set_index('pat_codes').to_dict()['pattern']
category_key=category_key.set_index('cat_codes').to_dict()['category']

In [17]:
#load the model
model=pickle.load(open('/content/drive/My Drive/fashion_annotation/model_4.pickle', 'rb'))

In [12]:
#query API and format the records

def query_ig(num):
    
    num_resp_per_hash=num
    
    hashtags=['fashion','ootd',
              'instastyle','fashionblogger',
              'instafashion','trendsetter',
              'lookoftheday']
    
    #query api and get results in json format
    response = make_queries(hashtags,num_resp_per_hash)
    print('got responses')
    
    #put response in db format
    records = get_meta_records(response)
    print('transformed records')
    
    #insert records into Google Cloud db
    #insertRecords(records)
    return records

In [18]:
#insert records into metadata table and update the other tables

def insertRecords_(recordList):

    conn = mysql.connector.connect(**config)
    cursor = conn.cursor()
    print("Connected to Google Cloud MySQL")

    query= """
            INSERT INTO metadata (queryTag, caption, hashtags, likesCount,
            ownerId, year, month, day, image, displayUrl, category, pattern, color) 
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);"""
    cursor.executemany(query,recordList)
    conn.commit()
    conn.close()
    print("Total", cursor.rowcount, "Records inserted successfully into metadata table")
    update_post_counts()
    update_color_counts()
    update_pattern_counts()
    update_category_counts()
    print('updated count tables')
    print("The mySQL connection is closed")

In [19]:
#process images for VGG16

def process_images_(batch):
  #batch is list of images
  n=len(batch)
  imgs_array=np.zeros((n,224,224,3))

  for count,im in enumerate(batch):
    byte_im=io.BytesIO(im)
    img=Image.open(byte_im)
    img_sized=img.resize((224,224))
    im_array=np.asarray(img_sized)
    imgs_array[count,:,:,:]=im_array
  
  processed=preprocess_input(imgs_array)

  return processed

In [20]:
#predict annotations and convert to text labels

def make_prediction(image_batch):
  im_array = process_images_(image_batch)

  #make predictions
  prediction=model.predict(x=im_array)
  cat_pred=np.argmax(prediction[0],axis=1)
  pat_pred=np.argmax(prediction[1],axis=1)
  col_pred=np.argmax(prediction[2],axis=1)
  #get corresponding string values for predictions
  categories=np.vectorize(category_key.get)(cat_pred)
  patterns=np.vectorize(pattern_key.get)(pat_pred)
  colors=np.vectorize(color_key.get)(col_pred)
  results=list(zip(categories,patterns,colors))
  return results

In [21]:
#final function that puts it all together

def get_and_predict(num_posts):
  records=query_ig(num_posts)
  image_batch=[r[8] for r in records]

  preds=make_prediction(image_batch)
  print('made predictions')
  records_lists=[[r for r in record] for record in records]
  preds_lists=[[str(p) for p in pred] for pred in preds]

  for n in range(0,len(records_lists)):
    records_lists[n].extend(preds_lists[n])

  updated_records=[tuple(record) for record in records_lists]

  insertRecords_(updated_records)

In [26]:
get_and_predict(10)

got responses
transformed records
made predictions
Connected to Google Cloud MySQL
Total 68 Records inserted successfully into metadata table
updated count tables
The mySQL connection is closed
