# Parallelization

The purpose of this workbook is to test parallelization and how and where to employ it in the LDA model. The following ressources serves as a good purpose for background information.
- https://docs.dask.org/en/latest/
- https://towardsdatascience.com/trying-out-dask-dataframes-in-python-for-fast-data-analysis-in-parallel-aa960c18a915
- https://towardsdatascience.com/how-i-learned-to-love-parallelized-applies-with-python-pandas-dask-and-numba-f06b0b367138

In [79]:
# Import moduls and 
# change directory to make it relative to top level moduls
import os,sys,inspect
currentdir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
parentdir = os.path.dirname(currentdir)
sys.path.insert(0,parentdir)

In [80]:
# Install a package in the current Jupyter kernel
# This portion can be out commented once it has been completed on a new system.
import sys
!conda install --yes --prefix {sys.prefix} dask

Solving environment: ...working... done

# All requested packages already installed.



In [81]:
# Import modules to be used in this package
import pandas as pd
from utils.lipht_data import getEngine

In [4]:
# Set the connection to the server
engine = getEngine('LIPHT-VM-01','Akademikernes_MSCRM_Addition')
query="""
        SELECT [ThreadID]
                        ,[ThreadTopic]
                        ,[SignalMessageID]
                        ,[SignalSource] AS ThreadInitiatedBy
                        ,[SignalMessageBodyClean]
                        ,[IsSystemGenerated]
                    FROM [Akademikernes_MSCRM_Addition].[out].[vw_LDA_MessagesALL]
    """

In [5]:
# Import data
df = pd.read_sql(query, engine).copy(deep=True)

In [6]:
df = df[['SignalMessageBodyClean']]

In [7]:
df_test = df.copy(deep=True)

In [8]:
import dask.dataframe as dd

In [9]:
from dask.multiprocessing import get
from multiprocessing import cpu_count
nCores = cpu_count()

In [10]:
nCores

8

In [11]:
ddf = dd.from_pandas(df, npartitions=nCores)

In [12]:
def df_simple_clean_string(col_name):
    col_name=col_name.str.lower() # convert text to lower
    col_name=col_name.str.replace(r'^\n','') # remove newline in start of message
    col_name=col_name.str.replace(r'^((\n*\s*|\n*\S*\s*)(\b|\b\w)(kære|hej|hejsa|hello|hi|dear|til)\s?(\w[a-zæøåA-ZÆØÅ\w-]{2,}\s\w[a-zæøåA-ZÆØÅ\w.-]{0,}\s\w[a-zæøåA-ZÆØÅ\w-]{0,}\s\w[a-zæøåA-ZÆØÅ\w-]{2,}|\w[a-zæøåA-ZÆØÅ\w-]{2,}\s\w[a-zæøåA-ZÆØÅ\w.-]{0,}\s\w[a-zæøåA-ZÆØÅ\w-]{2,}|\w[a-zæøåA-ZÆØÅ\w-]{2,}\s\w[a-zæøåA-ZÆØÅ\w-]{0,}|\w[a-zæøåA-ZÆØÅ\w.-]{2,})?(|\n))','') # remove greeting
    col_name=col_name.str.replace(r'^\n','') # remove newline in start of message
    col_name=col_name.str.split(r'((med\svenlig\shilsen|de\sbedste\shilsner|bedste\shilsener|venlig\shilsen|hilsen|vh|best\sregards|with\skind\sregards|kind\sregards|with\sregards|regards|kh|sende\sfra\smin|sent\sfrom\smy\s).*)').str[0] # remove everything after polite ending
    col_name=col_name.str.replace(r'(?<=[^\d\W])\.','_') # replaces punctuation in URL's with a underscore
    col_name=col_name.str.replace(r'(?<=[^\W]\d)\.(?=\d)','') # replaces punctuation in numbers with nothing, to avoide splitting of numbers
    col_name=col_name.str.replace(r'[\.!]','') # Remove all punctuation and exclamationmarks
    return col_name

In [13]:
def df_clean_sting(col_name):
    from utils.lipht_regex import re_accronyms, re_keyconcepts, re_lemma, re_remove, re_system
    from bs4 import BeautifulSoup
    import re
    """ Cleans 'col_name' in a variaty of ways.
        Specificly designed with danish e-mails in mind."""

    def _clean_html(text):
        return BeautifulSoup(text, 'html.parser').get_text()

    def remove_accronyms(text):
        re_accronyms_comp = {re.compile(k): v for k, v in re_accronyms.items()}
        for pattern, replacement in re_accronyms_comp.items():
            text = pattern.sub(replacement, text)
        return text

    def make_keyconcepts(text):
       re_keyconcepts_comp = {re.compile(k): v for k, v in re_keyconcepts.items()}
       for pattern, replacement in re_keyconcepts_comp.items():
           text = pattern.sub(replacement, text)
       return text

    def lemmatize(text):
        re_lemma_comp = {re.compile(k): v for k, v in re_lemma.items()}
        for pattern, replacement in re_lemma_comp.items():
            text = pattern.sub(replacement, text)
        return text
            
    col_name=col_name.str.replace('</p><p>','</p>.\n<p>').astype(str) # Insert newline in paragraphs
    col_name=col_name.apply(_clean_html) # parse any HTML
    col_name=col_name.str.lower() # convert text to lower
    col_name=col_name.replace(regex=re_accronyms) # replace anything found in the dictionary: re_accronyms
    col_name=col_name.replace(regex=re_keyconcepts) # replace anything found in the dictionary: re_keyconcepts
    col_name=col_name.replace(regex=re_lemma) # replace anything found in the dictionary: re_lemma
    # df[col_name]=df[col_name].apply(remove_accronyms)
    # df[col_name]=df[col_name].apply(make_keyconcepts)
    # df[col_name]=df[col_name].apply(lemmatize)
    # df[col_name]=df[col_name].apply(keyword_processor.replace_keywords) # Use FlashText KeywordProcesser -> https://github.com/vi3k6i5/flashtext -> Build dict of replacements
    col_name=col_name.str.replace(r'^\n','') # remove newline in start of message
    col_name=col_name.str.replace(r'^((\n*\s*|\n*\S*\s*)(\b|\b\w)(kære|hej|hejsa|hello|hi|dear|til)\s?(\w[a-zæøåA-ZÆØÅ\w-]{2,}\s\w[a-zæøåA-ZÆØÅ\w.-]{0,}\s\w[a-zæøåA-ZÆØÅ\w-]{0,}\s\w[a-zæøåA-ZÆØÅ\w-]{2,}|\w[a-zæøåA-ZÆØÅ\w-]{2,}\s\w[a-zæøåA-ZÆØÅ\w.-]{0,}\s\w[a-zæøåA-ZÆØÅ\w-]{2,}|\w[a-zæøåA-ZÆØÅ\w-]{2,}\s\w[a-zæøåA-ZÆØÅ\w-]{0,}|\w[a-zæøåA-ZÆØÅ\w.-]{2,})?(|\n))','') # remove greeting
    col_name=col_name.str.replace(r'^\n','') # remove newline in start of message
    col_name=col_name.str.replace(r'(?<=[^\d\W])\.','_') # replaces punctuation in URL's with a underscore
    col_name=col_name.str.replace(r'(?<=[^\W]\d)\.(?=\d)','') # replaces punctuation in numbers with nothing, to avoide splitting of numbers
    col_name=col_name.str.split(r'((med\svenlig\shilsen|de\sbedste\shilsner|bedste\shilsener|venlig\shilsen|hilsen|vh|best\sregards|with\skind\sregards|kind\sregards|with\sregards|regards|kh|sende\sfra\smin|sent\sfrom\smy\s).*)').str[0] # remove everything after polite ending
    col_name=col_name.str.replace(r'^.*(tak\sfor\s(din\smail|en\sbehagelig\ssamtale|din\sbesked|svar|din\stilbagemelding|indsendte))\.?','')#.str[1] # remove everything before these statements 
    col_name=col_name.str.replace(r'((tak\sfor\shjælpen|på\sforhånd\stak|på\sforhånd\smange\stak|jeg\sser\sfrem\stil\sat\shøre\sdig|håber\sdet\svar\ssvar\snok|mange tak).*)','')#.str[0] # Remove every match and everything after. # Mail ending
    col_name=col_name.str.replace(r'((på\sforhånd\stak)|(((jeg|du)\sønske(s|r\s)(dig)?|(hav))\s)?(en\s)?((fortsat|rigtig)\s)?god(t)?\s(weekend|dag|ferie|jul|nytår|påske)|(held\sog\slykke\s(fremover)?)|(god\sarbejdslyst).*)','') # Remove every match and everything after. # Mail polite ending
    col_name=col_name.replace(regex=re_remove) # replace anything found in the dictionary: re_remove
    col_name=col_name.replace(regex=re_system) # replace anything found in the dictionary: re_system

    col_name=col_name.str.replace(r'^\n','') # remove newline in start of message
    col_name=col_name.str.strip()
    # df[col_name]=df[col_name].str.replace(r'[\w\.-]+@[\w\.-]+','') # remove e-mail

In [16]:
sampel_frac = 0.01

In [17]:
ddf_sample = ddf.sample(frac=sampel_frac)
ddf_sample.shape[0].compute()

16560

In [18]:
%%time
ddf_sample['SignalMessageBodyClean'] = ddf_sample['SignalMessageBodyClean'].map_partitions(df_clean_sting).compute()

Wall time: 1min 10s


In [26]:
ddf_sample.head(10)



Unnamed: 0,SignalMessageBodyClean
0,
1,
2,
3,
4,
5,
6,


In [20]:
ddf_sample2 = ddf.sample(frac=sampel_frac)
ddf_sample2.shape[0].compute()

16560

In [21]:
%%time
ddf_sample2['SignalMessageBodyClean'] = ddf_sample2['SignalMessageBodyClean'].map_partitions(df_simple_clean_string).compute()

Wall time: 1.83 s


In [27]:
ddf_sample2.head(20)



Unnamed: 0,SignalMessageBodyClean


In [137]:
# Emils træls AKA streng
attribute_mask = '5,28,18,71,87,91,96,78,19,23,17,35,82,20,11,21,85,74,73,80,10,90,77,16,81,13,7,8,37,75,79,36,34,6,24,9,22,2,76,72,30,12,15,83,88,14,93,86,84'
changedata = 'False~0~02/10/2014 22:40:07~~01/22/2014 12:36:00~~team,c8e9d375-6c90-e211-9da9-005056ad53c8~~1~02/12/2014 22:40:06~01/24/2014 12:36:00~~~contact,1515ace7-738f-e311-bf81-005056ad2d14~0~Plan for ledighed efter endt uddannelse som diplomingeniør~10380838~~~~10014~https://mit.akademikernes.dk/simple.aspx?func=iak.sequrelink&param=5WXv2GsVC7%2fQPTkgaRRHvPCVW7Gy4Ju%2bVHEFT%2fPEVVyHECrMLThQvsbf0yjUjTFEnKS4pDRYl44%2bGRHk0EpwbHl55vSdCXSDxAtPgPkSG2RZq8YUW6QvNrdcsNWjWSsbRez2vc0tHLvRxAAm6FX8hAKFJcZHzC0B~~systemuser,4dbe4e0a-558f-e311-bc61-0050569111ff~~10995~~02/18/2014 13:55:38~False~~~False~~systemuser,371f2045-4c8f-e311-bc61-0050569111ff~systemuser,5f11e8ec-0d8f-e311-8cfd-0050569111ff~~False~02/18/2014 13:53:10~~~True~02/20/2014 13:55:38~1~613728~False~~~613728_besked.xml~Plan for ledighed efter endt uddannelse som diplomingeniør'

l = [(item[0], item[1].split(",")) for item in list(zip(attribute_mask.split(","),changedata.split("~"))) if len(item[1])>0]
l

[('5', ['False']),
 ('28', ['0']),
 ('18', ['02/10/2014 22:40:07']),
 ('87', ['01/22/2014 12:36:00']),
 ('96', ['team', 'c8e9d375-6c90-e211-9da9-005056ad53c8']),
 ('19', ['1']),
 ('23', ['02/12/2014 22:40:06']),
 ('17', ['01/24/2014 12:36:00']),
 ('20', ['contact', '1515ace7-738f-e311-bf81-005056ad2d14']),
 ('11', ['0']),
 ('21', ['Plan for ledighed efter endt uddannelse som diplomingeniør']),
 ('85', ['10380838']),
 ('10', ['10014']),
 ('90',
  ['https://mit.akademikernes.dk/simple.aspx?func=iak.sequrelink&param=5WXv2GsVC7%2fQPTkgaRRHvPCVW7Gy4Ju%2bVHEFT%2fPEVVyHECrMLThQvsbf0yjUjTFEnKS4pDRYl44%2bGRHk0EpwbHl55vSdCXSDxAtPgPkSG2RZq8YUW6QvNrdcsNWjWSsbRez2vc0tHLvRxAAm6FX8hAKFJcZHzC0B']),
 ('16', ['systemuser', '4dbe4e0a-558f-e311-bc61-0050569111ff']),
 ('13', ['10995']),
 ('8', ['02/18/2014 13:55:38']),
 ('37', ['False']),
 ('36', ['False']),
 ('6', ['systemuser', '371f2045-4c8f-e311-bc61-0050569111ff']),
 ('24', ['systemuser', '5f11e8ec-0d8f-e311-8cfd-0050569111ff']),
 ('22', ['False']),
 

In [40]:
d = {item[0]: item[1].split(',') for item in list(zip(attribute_mask.split(','),changedata.split('~')))}

In [138]:
import pandas as pd
import numpy as np
p = pd.DataFrame([(item[0], item[1].split(',')) for item in list(zip(attribute_mask.split(','),changedata.split('~'))) if len(item[1])>0])
p.columns = ['entry', 'text'] # name columns
p[['value','identifier']] = pd.DataFrame(p.text.values.tolist(), index= p.index) # split text into two new columns
p = p[['entry','value','identifier']] # only keep necessary
# p = p.replace(r'\s+', np.nan, regex=True)
# p.dropna(subset=['value'])
p

Unnamed: 0,entry,value,identifier
0,5,False,
1,28,0,
2,18,02/10/2014 22:40:07,
3,87,01/22/2014 12:36:00,
4,96,team,c8e9d375-6c90-e211-9da9-005056ad53c8
5,19,1,
6,23,02/12/2014 22:40:06,
7,17,01/24/2014 12:36:00,
8,20,contact,1515ace7-738f-e311-bf81-005056ad2d14
9,11,0,


In [82]:
engine1 = getEngine('LIPHT-VM-01','Akademikernes_MSCRM')

In [130]:
query = """SELECT TOP (1000)
       [AttributeMask] = SUBSTRING([A].[AttributeMask], 2, LEN([A].[AttributeMask]) - 2)
     , ChangeData
	 ,AuditId
FROM   [dbo].[AuditBase] [A]"""

In [131]:
df = pd.read_sql(query, engine1).copy(deep=True)

In [132]:
df.head()

Unnamed: 0,AttributeMask,ChangeData,AuditId
0,21115824,"~0~1~02/10/2014 03:51:20~systemuser,5f11e8ec-0...",7A4CE450-A498-E311-9417-2C768A53AE6B
1,21115824,"~0~1~02/10/2014 03:51:20~systemuser,5f11e8ec-0...",7B4CE450-A498-E311-9417-2C768A53AE6B
2,21115824,"~0~1~02/10/2014 03:51:20~systemuser,5f11e8ec-0...",7C4CE450-A498-E311-9417-2C768A53AE6B
3,21115824,"~0~1~02/10/2014 03:51:21~systemuser,5f11e8ec-0...",7D4CE450-A498-E311-9417-2C768A53AE6B
4,1115824,"1~2~02/18/2014 13:53:10~systemuser,4dbe4e0a-55...",23A4A557-A498-E311-9417-2C768A53AE6B


In [133]:
# df = # datasæt med ["AuditId", "ChangeData", "AttributeMask"]
df["AttributeMask"] = df["AttributeMask"].str.split(",")
df["ChangeData"] = df["ChangeData"].str.split("~")
rows = []
for _, row in df.iterrows():
    [rows.append([row["AuditId"], key, val]) for key, val in zip(row["AttributeMask"], row["ChangeData"])]

df_out = pd.DataFrame(rows)
df_out.columns = ["AuditId", "AttributeMask","ChangeData"] # name columns

In [136]:
df_out.head(10)

Unnamed: 0,AuditId,AttributeMask,ChangeData
0,7A4CE450-A498-E311-9417-2C768A53AE6B,2,
1,7A4CE450-A498-E311-9417-2C768A53AE6B,11,0
2,7A4CE450-A498-E311-9417-2C768A53AE6B,15,1
3,7A4CE450-A498-E311-9417-2C768A53AE6B,8,02/10/2014 03:51:20
4,7A4CE450-A498-E311-9417-2C768A53AE6B,24,"systemuser,5f11e8ec-0d8f-e311-8cfd-0050569111ff"
5,7B4CE450-A498-E311-9417-2C768A53AE6B,2,
6,7B4CE450-A498-E311-9417-2C768A53AE6B,11,0
7,7B4CE450-A498-E311-9417-2C768A53AE6B,15,1
8,7B4CE450-A498-E311-9417-2C768A53AE6B,8,02/10/2014 03:51:20
9,7B4CE450-A498-E311-9417-2C768A53AE6B,24,"systemuser,5f11e8ec-0d8f-e311-8cfd-0050569111ff"


In [134]:
df.head()

Unnamed: 0,AttributeMask,ChangeData,AuditId
0,"[2, 11, 15, 8, 24]","[, 0, 1, 02/10/2014 03:51:20, systemuser,5f11e...",7A4CE450-A498-E311-9417-2C768A53AE6B
1,"[2, 11, 15, 8, 24]","[, 0, 1, 02/10/2014 03:51:20, systemuser,5f11e...",7B4CE450-A498-E311-9417-2C768A53AE6B
2,"[2, 11, 15, 8, 24]","[, 0, 1, 02/10/2014 03:51:20, systemuser,5f11e...",7C4CE450-A498-E311-9417-2C768A53AE6B
3,"[2, 11, 15, 8, 24]","[, 0, 1, 02/10/2014 03:51:21, systemuser,5f11e...",7D4CE450-A498-E311-9417-2C768A53AE6B
4,"[11, 15, 8, 24]","[1, 2, 02/18/2014 13:53:10, systemuser,4dbe4e0...",23A4A557-A498-E311-9417-2C768A53AE6B


In [120]:
%%time
rows = []
for _, row in df.iterrows():
    [rows.append([row['AuditId'], key, val]) for key, val in zip(row['AttributeMask'], row['ChangeData'])]

df_out = pd.DataFrame(rows)

Wall time: 383 ms


In [123]:

df_out.columns = ['AuditId', 'AttributeMask','ChangeData'] # name columns
df_out.head()

Unnamed: 0,AuditId,AttributeMask,ChangeData
0,ADE9D4E3-15BA-E711-80C9-0025B52A0006,5,False
1,ADE9D4E3-15BA-E711-80C9-0025B52A0006,28,0
2,ADE9D4E3-15BA-E711-80C9-0025B52A0006,18,05/14/2013 00:00:00
3,ADE9D4E3-15BA-E711-80C9-0025B52A0006,71,"contact,364b9071-858f-e311-bf81-005056ad2d14"
4,ADE9D4E3-15BA-E711-80C9-0025B52A0006,87,


In [111]:
def converter(attributeMask, changedata):
    l = [(item[0], item[1].split(',')) for item in list(zip(attribute_mask.split(','),changedata.split('~'))) if len(item[1])>0]
    return l

In [112]:
df['AuditLogData'] = df.apply(lambda x: converter(x['AttributeMask'], x['ChangeData']), axis=1)
df = df[['AuditId','AuditLogData']]

In [113]:
df.head()

Unnamed: 0,AuditId,AuditLogData
0,ADE9D4E3-15BA-E711-80C9-0025B52A0006,"[(CRM Setup, [False]), (5, [0]), (28, [05/14/2..."
1,61139A21-5BB6-E811-80D9-0025B52A0006,"[(CRM Setup, [False]), (5, [0]), (28, [09/12/2..."
2,8D183FB6-63B6-E811-80D9-0025B52A0006,"[(CRM Setup, [False]), (5, [0]), (28, [09/12/2..."
3,1C35F0F9-93B6-E811-80D9-0025B52A0006,"[(CRM Setup, [False]), (5, [0]), (28, [09/12/2..."
4,C0277EB2-1FB8-E811-80D9-0025B52A0006,"[(CRM Setup, [False]), (5, [0]), (28, [09/12/2..."
