<a href="https://colab.research.google.com/github/ranieri-unimi/faces-malchiodi-2022/blob/main/faces.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### run once

In [None]:
!pip install pyspark
!pip install findspark

In [None]:
import os
os.environ["KAGGLE_USERNAME"] = 'ranieriunimi'
os.environ["KAGGLE_KEY"] = str(hex(232307088475198570779809482024044346960))[2:]

In [None]:
ref = 'bwandowando/ukraine-russian-crisis-twitter-dataset-1-2-m-rows'
!mkdir datasets
!kaggle datasets download $ref --unzip -p ./datasets

## ukraine

In [None]:
SAMPLE_SIZE = 10*1000

In [None]:
import numpy as np
import pandas as pd
import csv
import re
import string
import random

In [None]:
import pyspark
import findspark

In [None]:
# load dataset 
filename = r"./datasets/UkraineCombinedTweetsDeduped20220227-131611.csv.gzip"
pd.set_option("display.max_columns", None)
df = pd.read_csv(filename, compression='gzip', index_col=0, encoding='utf-8', quoting=csv.QUOTE_ALL)

## data cleaning

In [None]:
df = df[df.language == 'en']

In [None]:
df = df.text.tolist()

In [None]:
df = random.sample(df, SAMPLE_SIZE) if SAMPLE_SIZE else df

In [None]:
def preprocess_tweet_text(tweet):
    tweet = tweet.lower()
    tweet = re.sub(r"http\S+|www\S+|https\S+", '', tweet, flags=re.MULTILINE)
    tweet = tweet.translate(str.maketrans(string.punctuation+'…’”“‘', ' '*(len(string.punctuation)+5)))  # puntctuation to spaces
    tweet = ' '.join(tweet.strip().split())
    return tweet

## hadoop instance

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

In [None]:
rdd = sc.parallelize(enumerate(df))

In [None]:
rdd.take(7)

In [None]:
def shiftkv(x) : return (x[-1], x[0])

In [None]:
dataPipe = (rdd
            .mapValues(preprocess_tweet_text)
            .map(shiftkv)
            .reduceByKey(min)
            .map(shiftkv)
)

In [None]:
dataPipe.take(7)

In [None]:
dataPipe.count()

### foos

In [None]:
!pip install crc16
import crc16
import binascii

### shingles

In [None]:
SH_LEN = 9

In [None]:
shigles = dataPipe.flatMap(lambda x : [(x[0], x[-1][i:i+SH_LEN]) for i in range(len(x[-1])-SH_LEN)]).distinct()

In [None]:
shigles.take(7)

In [None]:
def hashval(x):
  bval = bytes(x, 'utf-8')
  hval = binascii.crc32(bval) # 4 bytes
  return hval

In [None]:
charactMx = shigles.mapValues(hashval).distinct()

In [None]:
charactMx.take(7)

### MinHashing

In [None]:
def RN(stop, start = 1) : return random.randrange(start,stop)
def RB(B = 4) : return RN(2**(8*B-1),0)*2+1
def PF(x, scale, shift, base) : return (scale * x + shift) % base

In [None]:
import random
n_permfoo, b_bands = 15, 3

(1 / b_bands)**(1 / (n_permfoo//b_bands)) # threshold

In [None]:
params = [ {'shift':RB(), 'scale':RB(), 'base':2**(8*4)} for _ in range(n_permfoo) ]
params

In [None]:
def gen_perm(x):
  doc, shingle = x
  return [ ((doc, h), (shingle, h) ) for h in range(n_permfoo)]

In [None]:
minHashSignMx = (charactMx
                 .flatMap(gen_perm)
                 .reduceByKey(lambda a, b : a if PF(a[0], **params[a[-1]]) < PF(b[0], **params[a[-1]]) else b)
                 .mapValues(lambda v : v[0])
)
# (doc, hash) , shingle

In [None]:
minHashSignMx.take(7)

### LSH

In [None]:
scale, shift = RN(b_bands), RN(b_bands)
bandmap = (lambda x : PF(x, scale, shift, b_bands))

In [None]:
def band_expand(x):
  (doc, h), v = x
  return ((doc, bandmap(h)), (h, v))

def band_reduct(x):
  (doc, band), a = x
  return ((hash(tuple(a)), band), doc)

In [None]:
bandMx = (minHashSignMx              # (doc, hashperm) , valshin
          .map(band_expand)             # (doc, band) , (hashperm , valshin)
          .groupByKey().mapValues(list)    # (doc, band) , [ (hashperm , valshin), (hashperm , valshin) ,(hashperm , valshin), ...]
          .mapValues(lambda a : sorted(a, key=lambda x: x[0]))   # (doc, band) , [ (hashperm , valshin), (hashperm , valshin) ,(hashperm , valshin), ...] but sorted
          .mapValues(lambda a : [v for _, v in a] )      # (doc, band) , [valshin, valshin, valshin]
)

In [None]:
bandMx.take(7)

In [None]:
bandBuckets = (bandMx
    .map(band_reduct)                           # (HHH_valshin, band), doc
    .groupByKey().mapValues(list)           # (HHH_valshin, band), [doc, doc, doc]
)

In [None]:
lst = bandBuckets.filter(lambda x : len(x[1])>1).collect()

In [None]:
lst

In [None]:
for v in dict(lst).values():
  for i in v:
    print(df[i])
    print('---------------')
  print('___________________________________________________________________________________________')