# imports and configs

In [1]:
import os
import sys
import pathlib

In [2]:
USERS_PATH = '/home/smadani/data/dh_users/*.parquet'

In [3]:
import pandas as pd
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
import pickle

from glob import glob
from pyspark.sql import SparkSession
from tqdm import tqdm

spark = (
    SparkSession
    .builder
    .master("local[{}]".format(60))
    .config("spark.driver.memory", "{}g".format(30))
    .config("spark.driver.maxResultSize", f"{10}g")
    .getOrCreate()
)


22/06/24 09:38:44 WARN Utils: Your hostname, achtung07 resolves to a loopback address: 127.0.0.1; using 192.168.2.7 instead (on interface enp65s0f0)
22/06/24 09:38:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/06/24 09:38:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/06/24 09:38:46 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/06/24 09:38:46 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


# load users table

In [4]:
users_df = spark.read.parquet(USERS_PATH)
users_df = users_df.dropDuplicates(subset=['uid'])
#users_df.show(5)
#users_df.count()

                                                                                

## extract PIs

In [11]:
import re
import emoji


def distinct_emoji_list(string):
    """Resturns distinct list of emojis from the string"""
    return {x['emoji'] for x in emoji.emoji_list(string)}


def clean_personal_marker(phrase):
    """ Clean a clause extracted from a description"""
    if not phrase:
        return None

    # drop weird special characters
    phrase = phrase.encode('ascii', errors='ignore').decode().strip()
    x_prev = phrase

    while True:
        # remove excess whitespace
        phrase = re.sub(r"\s+", " ", phrase).strip()

        # address common cases
        phrase = re.sub(r"^i (love|like|enjoy) ", "", phrase)
        phrase = re.sub(r"^(i am|i'm|i'm) (a |an )?", "", phrase)
        phrase = re.sub(r"^(i |a[n]?)\b", "", phrase)
        phrase = re.sub(r"^(and|the|from|to)\b", "", phrase)
        phrase = re.sub(r" of$", "", phrase)
        phrase = re.sub(r'(on )?(snapchat|snap|ig|insta|instagram|email|phone): +[A-Za-z0-9_@.-]+', " ", phrase)
        phrase = re.sub(r'\u200d', "", phrase)

        phrase = phrase.replace("#", "")
        phrase = phrase.strip().strip(".,/!-]+[#@:)(-?'$%&_").strip()
        phrase = re.sub(r"[!\(\)?.\{\}]", " ", phrase).strip()
        if phrase == x_prev:
            return phrase

        x_prev = phrase


def generate_split_profile_description(description):
    """Splits up a profile description into a set of clauses. Returns the clauses and
    all emojis in the description (which are being treated as identity markers)
    """

    # remove URLs and email addresses
    d = re.sub(r'\w+@\w+\.\w+', '', description.lower()).strip()
    d = re.sub(r'http\S+', '', d).strip()
    d = d.replace("&emsp;", "").replace("&nbsp;", "")

    # get all emoji and remember them, then treat them as split characters
    emojis = distinct_emoji_list(d)
    d = emoji.get_emoji_regexp().sub("|", d)  # .encode("ascii","namereplace").decode()

    # split on sensible split characters
    # | and
    spl = [x for x in re.split(
        r"[\(\)|•*;~°,\n\t]|[!…]+|[-–\/.]+ | [&+:]+ | [+] |([\/])(?=[A-Za-z ])|([.!-]{2,})| and |([#@][A-Za-z0-9_]+)",
        d.lower()) if (
                   x and x.strip() != "" and not x.strip() in "|•&*#;~°.!…-/–")]

    # clean all clauses
    spl = [clean_personal_marker(x) for x in spl]
    # remove weird things and things that become empty
    spl = [x for x in spl if x.strip() != "" and x.encode() != b'\xef\xb8\x8f']
    return spl, emojis


def find_identifiers_simple(description):
    spl, emojis = generate_split_profile_description(description)
    spl = [s for s in spl if len(s.split(' '))<4]
    return spl, emojis


In [12]:
@F.udf(returnType=T.ArrayType(T.StringType()))
def extract_pid(bio):
    return find_identifiers_simple(bio)[0]

user_to_pi = (
    users_df
    .dropDuplicates(subset=['uid'])
    .withColumn('pi', extract_pid('description'))
    .select('uid', 'pi')
)

#user_to_pi.persist()

#user_to_pi.select('pi').show(10, False)

In [13]:
pis_list = list(user_to_pi.toPandas()['pi'])
pis_list[:10]





                                                                                

[['doer', 'maker', 'photo-taker'],
 ['build things',
  'foursquare',
  'stockadefc',
  'streetfc',
  'dodgeball',
  'husband to',
  'chelsa',
  'dad to',
  'snowboards',
  'soccer',
  'hot dogs'],
 ['product', 'design leader', 'formerly', 'getfandom', 'ign', 'yahoo'],
 ['founder at',
  'hellofahren',
  'com',
  'previously: led',
  'gokartlabs',
  'ameriprise',
  'amex',
  'dad',
  'biker',
  'guitaristcreate more',
  'consume less'],
 ['compassionate technocrat', 'connector', 'instigator'],
 ['only human',
  'ceo',
  'visualisgood',
  'journalism',
  'vlogging',
  'blacklivesmatter'],
 ['anti-racist', 'anti-fascist', 'fucktrump', 'ftp'],
 ['mother',
  'designer',
  'coder',
  'writer',
  'singer',
  'complete geek',
  '296 89',
  'or my employer',
  'she',
  'her'],
 ['tweeting about startups',
  'product design',
  'development',
  'ceo',
  'todesktop',
  'yc w20'],
 ['redacteur trends magazine',
  'retail',
  'technologie',
  'nieuwsfreak',
  'internetnerd']]

In [14]:
len(pis_list)

15459872

In [15]:
with open('/home/smadani/data/pis2020.pkl', 'wb') as f:
    pickle.dump(pis_list, f)