In [1]:
# IMPORTS
import io
import json
import re
import cv2 
import pytesseract
from PIL import Image, ExifTags
from os.path import exists
from os import listdir, environ
from google.cloud import vision
from google.cloud.vision_v1 import AnnotateImageResponse
from typing import Literal
import datetime
from dateutil import parser
import csv

In [291]:
# CONSTANTS
environ["GOOGLE_APPLICATION_CREDENTIALS"]="./key.json"
pytesseract.tesseract_cmd = r'/usr/local/Cellar/tesseract/5.0.1/bin/tesseract'
IN_DIR = './src/images'
OUT_DIR = './src/out'

COLOR_RED = (0, 0, 255)
COLOR_GREEN = (0, 255, 0)
COLOR_BLUE = (255, 0, 0)
COLOR_YELLOW = (0, 255, 255)
COLOR_WHITE = (255, 255, 255)

FONT = cv2.FONT_HERSHEY_SIMPLEX
FONT_SCALE_NORMAL = 1
FONT_SCALE_SMALL = 0.6
THICKNESS_NORMAL = 2
THICKNESS_THIN = 1
LINE_TYPE = 2

THRESHOLD = 50
SYMBOLS = ['play', 'heart', 'comment', 'save']
AUDIO_TARGETS = ['inu.tomodachi', '·', 'Original', 'Audio']
MONTHS = ['January', 'February', 'March', 'April', 'May', 'June', 'July', 'August', 'September', 'October', 'November', 'December']
UK = 'unknown'

In [271]:
# HELPER FUNCTIONS

def remove_extension(filename):
  return re.sub('\..+', '', filename)

def tesseract_ready_image(filename):
    path = f'{IN_DIR}/{filename}'
    if (exists(path)):
        return cv2.imread(path)

def get_image_data(img):
    if (img.any()):
        return pytesseract.image_to_data(img, output_type='dict')

def mark_text_boxes(img, filename):
    data = get_image_data(img)
    boxes = len(data['level'])
    for i in range(boxes):
        (x, y, w, h) = (data['left'][i], data['top']
                        [i], data['width'][i], data['height'][i])
        # boundary boxes
        cv2.rectangle(img, (x, y), (x + w, y + h), COLOR_YELLOW, 1)
        # actual text
        cv2.putText(img, data['text'][i], (x, y + h + 30), FONT,
                    FONT_SCALE_NORMAL, COLOR_RED, THICKNESS_NORMAL, LINE_TYPE)
        # coordinates
        cv2.putText(img, f'({x},{y})', (x, y + h + 60), FONT,
                    FONT_SCALE_SMALL, COLOR_GREEN, THICKNESS_THIN, LINE_TYPE)
    cv2.imwrite(f'{OUT_DIR}/{filename}', img)


In [272]:
def get_image_metadata(filename):
  path = f'{IN_DIR}/{filename}'
  if (exists(path)):
    img = Image.open(path)
    return {
      ExifTags.TAGS[k]: v for k, v in img._getexif().items() if k in ExifTags.TAGS
    }

In [273]:
def detect_text_using_google_vision_api(filename):
    path = f'{IN_DIR}/{filename}'
    client = vision.ImageAnnotatorClient()
    with io.open(path, 'rb') as image_file:
        content = image_file.read()
    image = vision.Image(content=content)
    response = client.text_detection(image=image)
    
    fname = remove_extension(filename)
    savePath = f'{OUT_DIR}/{fname}.json'
    with open(savePath, 'w') as f:
        response_json = AnnotateImageResponse.to_json(response)
        f.write(response_json)
        
    if response.error.message:
        raise Exception(
            '{}\nFor more info on error messages, check: '
            'https://cloud.google.com/apis/design/errors'.format(
                response.error.message))

In [274]:
files = listdir('./src/images')
files.remove('.DS_Store')
# filtering 
# files = list(filter(lambda x: int(re.search('\d+', x).group()) > 8217, files))

In [275]:
# TEXT RECOGNITION USING GOOGLE VISION API
for file in files:
  detect_text_using_google_vision_api(file)

In [276]:
# TEXT RECOGNITION USING TESSERACT (saving images for debugging)
for file in files:
  break
  img = tesseract_ready_image(file)
  mark_text_boxes(img, file)

In [277]:
# creating meta files

def serialize_meta(meta):
  return json.dumps({
    'width': meta['ExifImageWidth'],
    'height': meta['ExifImageHeight'],
    'time': meta['DateTimeOriginal'],
    'orientation': meta['Orientation'],
    'offset': meta['ExifOffset']
  }, indent=2)

for file in files:
  img = tesseract_ready_image(file)
  d = get_image_data(img)
  m = get_image_metadata(file)
  fname = remove_extension(file)
  with open(f'./src/meta/{fname}.json', 'w') as f:
    f.write(serialize_meta(m))
  with open(f'./src/tesseract/{fname}.json', 'w') as f:
    f.write(json.dumps(d, indent=2))


In [278]:
# testing stuff

d = {
  'meta': 'meta',
  'gva': 'out',
  'tr': 'tesseract',
  'tm': 'matching'
}
AllowedTypes = Literal['meta', 'gva', 'tr', 'tm']

def pather(fn, type: AllowedTypes):
  fname = remove_extension(fn)
  return f'./src/{d[type]}/{fname}.json'

def json_opner(path):
  with open(path, 'r') as f:
    res = f.read()
  return json.loads(res)

def jsoned(file, type: AllowedTypes):
  return json_opner(pather(file, type))
  
for file in files:
  meta = jsoned(file, 'meta')
  gva = jsoned(file, 'gva')
  tr = jsoned(file, 'tr')
  break

In [279]:
def indexer(data, ix):
  return {
    'level': data['level'][ix],
    'page_num': data['page_num'][ix],
    'block_num': data['block_num'][ix],
    'par_num': data['par_num'][ix],
    'line_num': data['line_num'][ix],
    'word_num': data['word_num'][ix],
    'left': data['left'][ix],
    'top': data['top'][ix],
    'width': data['width'][ix],
    'height': data['height'][ix],
    'conf': data['conf'][ix],
    'text': data['text'][ix]
  }

In [280]:
# testing template matching 
assets = ['comment', 'heart', 'accounts_reached', 'music', 'play', 'save', 'dm', 'duration']
col = [(0, 255, 50), (255, 0, 0), (255, 255, 0), (0, 255, 255), (255, 0, 255), (200, 200, 200), (100, 200, 0), (90, 255, 190)]


for file in files:
  break
  img = cv2.imread(f'./src/images/{file}')
  img_gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)

  for asset in assets:
    template = cv2.imread(f'./src/assets/{asset}.jpg', 0)
    w, h = template.shape[::-1]
    res = cv2.matchTemplate(img_gray, template, cv2.TM_CCOEFF_NORMED)
    _, _, _, max_loc = cv2.minMaxLoc(res)
    cv2.rectangle(img, max_loc, (w + max_loc[0], h + max_loc[1]), col[assets.index(asset)], 3)
  cv2.imwrite(f'./src/template_matching/{file}',img)
  # break

In [281]:
# saving result of template matching
for file in files:
  img = cv2.imread(f'./src/images/{file}')
  img_gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
  sv = {}
  for asset in assets:
    template = cv2.imread(f'./src/assets/{asset}.jpg', 0)
    w, h = template.shape[::-1]
    res = cv2.matchTemplate(img_gray, template, cv2.TM_CCOEFF_NORMED)
    _, _, _, max_loc = cv2.minMaxLoc(res)
    
    sv[asset] = {}
    sv[asset]['width'] = w
    sv[asset]['height'] = h
    sv[asset]['loc'] = max_loc
  
  with open(f'./src/matching/{remove_extension(file)}.json', 'w') as f:
    f.write(json.dumps(sv, indent=2))

In [288]:
# Reading & looking for the right data
def tesseract_perser(data):
  t = []
  for ix in range(len(data['level'])):
    t.append(indexer(data, ix))
  return t

def google_vision_api_parser(data):
  return data['textAnnotations']

def check_type(data):
  if 'description' in data:
    return 'gva'
  return 'tr'

def get_usefuldata(data, symbol, meta):
  return [
    check_type(data[0]),
    meta['width'],
    meta['height'],
    symbol['width'],
    symbol['height'],
    symbol['loc'],
  ]
  
def get_center_cord(x, y, width, height):
  return [
    (x + (width / 2)),
    (y + (height / 2))
  ]

def near(tg: int, bs: int) -> bool:
  return True if abs(bs - tg) < THRESHOLD else False

def get_lower_text(data, symbol, meta):
  [d_type, _, _, width, height, [x, y]] = get_usefuldata(data, symbol, meta)
  [cx, cy] = get_center_cord(x, y, width, height)
  
  if (d_type == 'tr'):
    ls = []
    for it in data:
      [itcx, itcy] = get_center_cord(it['left'], it['top'], it['width'], it['height'])
      if near(itcx, cx) and near(itcy, cy):
        ls.append(it)
    return ls
  elif (d_type == 'gva'):
    ls = []
    for it in data:
      [lt, rt, rb, _] = it['boundingPoly']['vertices']
      [itcx, itcy] = get_center_cord(lt['x'], lt['y'], (rt['x'] - lt['x']), (rt['y'] - rb['y']))
      if near(itcx, cx) and near(itcy, cy):
        ls.append(it)
    return ls

def check_for_original_music(data, symbol, meta):
  [d_type, _, _, width, height, [x, y]] = get_usefuldata(data, symbol, meta)
  [cx, cy] = get_center_cord(x, y, width, height)
  
  if (d_type == 'tr'):
    for it in data:
      if it['left'] > cx and abs(it['top'] - cy) < 30:
        if it['text'] in AUDIO_TARGETS:
          return True      
  elif (d_type == 'gva'):
    for it in data:
      [lt, _, _, _ ] = it['boundingPoly']['vertices']
      if lt['x'] > cx and abs(lt['y'] - cy) < 30:
        if it['description'] in AUDIO_TARGETS:
          return True
  return False

def get_duration_date(data, symbol, meta):
  [d_type, _, _, width, height, [x, y]] = get_usefuldata(data, symbol, meta)
  [cx, cy] = get_center_cord(x, y, width, height)
  
  duls = []
  yels = []
  mols = []
  dals = []
  if (d_type == 'tr'):
    for it in data:
      [itcx, itcy] = get_center_cord(it['left'], it['top'], it['width'], it['height'])
      if abs(itcx - cx) > 10 and abs(itcy - cy) < 10:
        s = re.search('^([0-1]?[0-9]|2[0-3]):[0-5][0-9]$', it['text'])
        if s:
          duls.append(s.group())
        
        t = re.search('^20\d{2}$', it['text'])
        if t:
          yels.append(it['text'])
        
        if it['text'] in MONTHS:
          mols.append(it['text'])
        
        d = re.search('^[1-3]?[0-9],?$', it['text'])
        if d:
          dals.append(it['text'].replace(',', ''))
  elif (d_type == 'gva'):
    for it in data:
      [lt, rt, rb, _] = it['boundingPoly']['vertices']
      [itcx, itcy] = get_center_cord(lt['x'], lt['y'], (rt['x'] - lt['x']), (rt['y'] - rb['y']))
      if abs(itcx - cx) > 30 and abs(itcy - cy) < 30:
        s = re.search('^([0-1]?[0-9]|2[0-3]):[0-5][0-9]$', it['description'])
        if s:
          duls.append(s.group())
          
        t = re.search('^20\d{2}$', it['description'])
        if t:
          yels.append(it['description'])
        
        if it['description'] in MONTHS:
          mols.append(it['description'])
        
        d = re.search('^[1-3]?[0-9],?$', it['description'])
        if d:
          dals.append(it['description'].replace(',', ''))
  
  return [duls, yels, mols, dals]

def get_accounts_reached(data, symbol, meta):
  [d_type, w, h, width, height, [x, y]] = get_usefuldata(data, symbol, meta)
  [cx, cy] = get_center_cord(x, y, width, height)
  
  ls = []
  if (d_type == 'tr'):
    for it in data:
      [itcx, itcy] = get_center_cord(it['left'], it['top'], it['width'], it['height'])
      if abs(itcx - cx) < 20 and abs(itcy - cy) < 120 and abs(itcy - cy) > 10:
        s = re.search('^[0-9]{1,3}(,[0-9]{3})*(\.[0-9]+)?|\.[0-9]$', it['text'])
        if s:
          ls.append(s.group().replace(',', ''))
  elif (d_type == 'gva'):
    for it in data:
      [lt, rt, rb, _] = it['boundingPoly']['vertices']
      [itcx, itcy] = get_center_cord(lt['x'], lt['y'], (rt['x'] - lt['x']), (rt['y'] - rb['y']))
      if abs(itcx - cx) < 20 and abs(itcy - cy) < 120:
        s = re.search('^[0-9]{1,3}(,[0-9]{3})*(\.[0-9]+)?|\.[0-9]$', it['description'])
        if s:
          ls.append(s.group().replace(',', ''))
  return ls  

def create_best_guess(data, meta, templates):
  r = {}
  for symbol in templates:
    tmp = templates[symbol]
    
    if symbol in SYMBOLS:
      txt = get_lower_text(data, tmp, meta)
    elif symbol == 'music':
      txt = check_for_original_music(data, tmp, meta)
    elif symbol == 'duration':
      txt = get_duration_date(data, tmp, meta)
    elif symbol == 'accounts_reached':
      txt = get_accounts_reached(data, tmp, meta)
    
    r[symbol] = txt
  return r

def ifelseunknown(tr, gva, i):
  if len(tr['duration'][i]) == 0 or len(gva['duration'][i]) == 0:
    return UK
  return tr['duration'][i][0] if tr['duration'][i][0] == gva['duration'][i][0] else UK

def toseconds(d):
  [m, s] = d.split(':')
  return int(s) + (int(m) * 60)

def numericlist(data):
  if len(data) == 0:
    return []
  tp = check_type(data[0])
  ix = 'text' if tp == 'tr' else 'description'
  return list(
    map(
      lambda x: 0 if x[ix] in ['()', ' ]'] else int(x[ix]) if x[ix].isnumeric() else 0,
      data
      )
  )

def compare_data(tr, gva):
  res = {}
  for symbol in tr:
    if symbol in SYMBOLS:
      trls = numericlist(tr[symbol])
      gvals = numericlist(gva[symbol])
      ls = [*trls, *gvals]
    
      ccc = UK
      for l in ls:
        if ls.count(l) == 2:
          ccc = l
      if ccc == UK:
        k = 0
        for l in ls:
          if l > k:
            k = l
        ccc = k
      res[symbol] = ccc
      
    elif symbol == 'music':
      res[symbol] = tr[symbol] and gva[symbol]
    elif symbol == 'duration':
      res['duration'] = ifelseunknown(tr, gva, 0)
      if res['duration'] != UK:
        res['duration'] = toseconds(res['duration'])
      res['year'] = ifelseunknown(tr, gva, 1)
      res['month'] = ifelseunknown(tr, gva, 2)
      res['day'] = ifelseunknown(tr, gva, 3)
    elif symbol == 'accounts_reached':
      res['reach'] = gva['accounts_reached'][0] if len(gva['accounts_reached']) > 0 else tr['accounts_reached'][0] if len(tr['accounts_reached']) > 0 else UK
  return res

def extract_data(file):
  fn = remove_extension(file)
  meta = jsoned(file, 'meta')
  tm = jsoned(file, 'tm')
  
  tr = jsoned(file, 'tr')
  trlist = tesseract_perser(tr)
  trdata = create_best_guess(trlist, meta, tm)
  
  gva = jsoned(file, 'gva')
  gvalist = google_vision_api_parser(gva)
  gvadata = create_best_guess(gvalist, meta, tm)
  
  final = compare_data(trdata, gvadata)
  
  elapsed = 0
  
  fm = "%Y-%m-%d"
  [tkdate, _] = meta['time'].split(' ')
  [tkyear, tkmonth, tkdate] = tkdate.split(':')
  tk = datetime.datetime(int(tkyear), int(tkmonth), int(tkdate))
  
  up = UK
  if final['month'] != UK and final['day'] != UK:
    up = f"{final['month']} {final['day']}"
    yyyy = 2022 if final['month'] in ['January', 'February'] else 2021
    up = f"{up}, {yyyy}"
    up = parser.parse(up)
    elapsed = (tk - up).days
    up = up.strftime("%Y-%m-%d")
  
  return {
    'file_name': fn,
    'is_original_audio': final['music'],
    'uploaded_at': up,
    'duration': final['duration'],
    'play': final['play'],
    'like': final['heart'],
    'comment': final['comment'],
    'save': final['save'],
    'reach': int(final['reach']) if final['reach'] != UK and final['reach'].isnumeric() else UK,
    'taken_at': tk.strftime(fm),
    'elapsed': elapsed,
  }


In [289]:
# extracting data
result = []

files = listdir('./src/images')
files.remove('.DS_Store')
for file in files:
  data = extract_data(file)
  result.append(data)

In [290]:
# writing to csv
with open('./data.csv', 'w', encoding='UTF8') as f:
  w = csv.DictWriter(f, result[0].keys())
  w.writeheader()
  w.writerows(result)
    

# after manually filling values for "unknown"

In [293]:
import pandas as pd
from sklearn import linear_model

url = 'https://raw.githubusercontent.com/kentozuka/data_and_social_media_analysis/main/data.csv'
csv = pd.read_csv(url)
csv.head()

df = csv.drop(['file_name', 'uploaded_at', 'taken_at', 'play'], axis=1)
df = df.rename(columns={'is_original_audio': 'audio'})
df['audio'] = df['audio'].astype(int)
df.head()

targets = ['duration', 'like', 'comment', 'save',  'elapsed']

reg = linear_model.LinearRegression()
reg.fit(df[targets], df.reach)
print(reg.coef_, reg.intercept_)

In [None]:
test = {
  'duration': 31,
  'like': 1996,
  'comment': 0,
  'save': 869,
  'elapsed': 125
}

reg.predict([[test[x] for x in targets]]