In [1]:
from pyspark.sql.functions import concat, col, lit

In [2]:
import pandas as pd
import seaborn as sns
from datetime import datetime, timezone
import os
import matplotlib.pyplot as plt
# os.environ['PYSPARK_PYTHON'] = os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/bin/python3'

In [7]:
os.environ['PYSPARK_PYTHON'] = os.environ['PYSPARK_DRIVER_PYTHON'] = '/opt/conda/bin/python'

In [None]:
!export PYSPARK_DRIVER_PYTHON=/opt/conda/bin/python
!export PYSPARK_PYTHON=/opt/conda/bin/python

In [3]:
import os
import pyspark
from pyspark.sql import SQLContext, SparkSession
import socket
from pyspark.sql import functions as F


os.environ["HADOOP_USER_NAME"] = "hdfs"
PACKAGES = [
]

sc = SparkSession \
    .builder \
    .master('k8s://https://kubernetes.default.svc') \
    .appName("sinah_sequentialrulemining") \
    .config("spark.kubernetes.container.image.pullSecrets", "docker.yektanet.tech") \
    .config("spark.kubernetes.container.image", "docker.yektanet.tech/data-infra/jupyterlab/spark-executer:latest") \
    .config("spark.kubernetes.namespace", "jupyter") \
    .config("spark.driver.host", f"{socket.gethostname()}.notebooks") \
    .config("spark.kubernetes.executor.request.cores", "3") \
    .config("spark.kubernetes.executor.limit.cores", "6") \
    .config("spark.executor.memory", "4g") \
    .config("spark.executor.instances", "2") \
    .config("spark.kubernetes.driver.pod.name", socket.gethostname()) \
    .config("spark.jars.packages", ",".join(PACKAGES)) \
    .config("spark.shuffle.compress", "true") \
    .config("spark.io.compression.codec", "snappy") \
    .config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2") \
    .config("spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored", "true") \
    .config("spark.hadoop.parquet.enable.summary-metadata", "false") \
    .config("spark.sql.parquet.mergeSchema", "false") \
    .config("spark.sql.parquet.filterPushdown", "true") \
    .config("spark.sql.hive.metastorePartitionPruning", "true") \
    .config("spark.sql.orc.filterPushdown", "true") \
    .config("spark.sql.orc.splits.include.file.footer", "true") \
    .config("spark.sql.orc.cache.stripe.details.size", "10000") \
    .config("spark.sql.hive.metastorePartitionPruning", "true") \
    .getOrCreate()
    
sqlContext = SQLContext(sparkContext=sc.sparkContext, sparkSession=sc)
print("Spark Version: " + sc.version)
print("PySpark Version: " + pyspark.__version__)

Spark Version: 3.1.1
PySpark Version: 3.1.2


## Campaign Data

In [9]:
campaign_id_df = pd.read_csv('campaign-id.csv')

In [10]:
campaign_kw_df = pd.read_csv('campaign-kw.csv')
kw_id_df = pd.read_csv('kw-id.csv')

In [11]:
kw_id_df.head()

Unnamed: 0,ID,Keyword
0,1,قرنطینه
1,2,فرهاد مجیدی
2,3,ویروس کرونای
3,4,پرسپولیس
4,5,ویروس کرونا


In [12]:
campaign_kw_df.drop(["ID"], axis=1)

Unnamed: 0,Campaign ID,Keyword ID
0,55707,4640
1,54487,789
2,55707,4737
3,55707,4802
4,55923,5089
...,...,...
116129,68138,20
116130,68138,277
116131,68138,4631
116132,68138,23


In [13]:
campaign_kw_merged = pd.merge(campaign_kw_df.drop(["ID"], axis=1), kw_id_df, how="left", left_on="Keyword ID", right_on="ID")


In [9]:
campaign_kw_merged.drop(['ID'], axis=1, inplace=True)

In [10]:
campaign_kw_agg = campaign_kw_merged.groupby('Campaign ID')['Keyword'].apply(list).reset_index(name='KWs')

In [11]:
campaign_kw_agg = pd.merge(campaign_kw_agg, campaign_id_df.loc[:,['Campaign ID', 'Created At']], how='left', on=['Campaign ID'])

In [12]:
# campaign_kw_agg = campaign_kw_agg[campaign_kw_agg["Created At"]> datetime()]



In [13]:
campaign_kw_agg.head()

Unnamed: 0,Campaign ID,KWs,Created At
0,45153,"[ترکیه, آلمان, بازار, کانادا, کسب و کار, مهاجر...",
1,45912,"[معاملات, سهام, آموزش بورس, بورس, سیگنال خرید,...",
2,46885,"[عید نوروز, کاردستی, اوقات فراقت, عروسک, سرگرم...",2020-05-07T19:14:44.633934+04:30
3,48084,"[شال, تونیک, بلوز, کیف, مزون, مایو, شلوارکتان,...",2020-05-18T01:14:32.449458+04:30
4,48125,"[آشپز, تابستان, کاردستی, عروسک, کودکانه, یادگی...",2020-05-18T12:33:13.049959+04:30


# semantic

In [14]:
english_stopwords = ["ourselves", "hers", "between", "yourself", "but", "again", "there", "about", "once", "during", "out", "very", "having", "with", "they", "own", "an", "be", "some", "for", "do", "its", "yours", "such", "into", "of", "most", "itself", "other", "off", "is", "s", "am", "or", "who", "as", "from", "him", "each", "the", "themselves", "until", "below", "are", "we", "these", "your", "his", "through", "don", "nor", "me", "were", "her", "more", "himself", "this", "down", "should", "our", "their", "while", "above", "both", "up", "to", "ours", "had", "she", "all", "no", "when", "at", "any", "before", "them", "same", "and", "been", "have", "in", "will", "on", "does", "yourselves", "then", "that", "because", "what", "over", "why", "so", "can", "did", "not", "now", "under", "he", "you", "herself", "has", "just", "where", "too", "only", "myself", "which", "those", "i", "after", "few", "whom", "t", "being", "if", "theirs", "my", "against", "a", "by", "doing", "it", "how", "further", "was", "here", "than"]
persian_stopwords = ["و", "در", "به", "از", "که", "این", "را", "با", "است", "برای", "آن", "یک", "خود", "تا", "کرد", "بر", "هم", "نیز", "گفت", "می\u200cشود", "وی", "شد", "دارد", "ما", "اما", "یا", "شده", "باید", "هر", "آنها", "بود", "او", "دیگر", "دو", "مورد", "می\u200cکند", "شود", "کند", "وجود", "بین", "پیش", "شده_است", "پس", "نظر", "اگر", "همه", "یکی", "حال", "هستند", "من", "کنند", "نیست", "باشد", "چه", "بی", "می", "بخش", "می\u200cکنند", "همین", "افزود", "هایی", "دارند", "راه", "همچنین", "روی", "داد", "بیشتر", "بسیار", "سه", "داشت", "چند", "سوی", "تنها", "هیچ", "میان", "اینکه", "شدن", "بعد", "جدید", "ولی", "حتی", "کردن", "برخی", "کردند", "می\u200cدهد", "اول", "نه", "کرده_است", "نسبت", "بیش", "شما", "چنین", "طور", "افراد", "تمام", "درباره", "بار", "بسیاری", "می\u200cتواند", "کرده", "چون", "ندارد", "دوم", "بزرگ", "طی", "حدود", "همان", "بدون", "البته", "آنان", "می\u200cگوید", "دیگری", "خواهد_شد", "کنیم", "قابل", "یعنی", "رشد", "می\u200cتوان", "وارد", "کل", "ویژه", "قبل", "براساس", "نیاز", "گذاری", "هنوز", "لازم", "سازی", "بوده_است", "چرا", "می\u200cشوند", "وقتی", "گرفت", "کم", "جای", "حالی", "تغییر", "پیدا", "اکنون", "تحت", "باعث", "مدت", "فقط", "زیادی", "تعداد", "آیا", "بیان", "رو", "شدند", "عدم", "کرده_اند", "بودن", "نوع", "بلکه", "جاری", "دهد", "برابر", "مهم", "بوده", "اخیر", "مربوط", "امر", "زیر", "گیری", "شاید", "خصوص", "آقای", "اثر", "کننده", "بودند", "فکر", "کنار", "اولین", "سوم", "سایر", "کنید", "ضمن", "مانند", "باز", "می\u200cگیرد", "ممکن", "حل", "دارای", "پی", "مثل", "می\u200cرسد", "اجرا", "دور", "منظور", "کسی", "موجب", "طول", "امکان", "آنچه", "تعیین", "گفته", "شوند", "جمع", "خیلی", "علاوه", "گونه", "تاکنون", "رسید", "ساله", "گرفته", "شده_اند", "علت", "چهار", "داشته_باشد", "خواهد_بود", "طرف", "تهیه", "تبدیل", "مناسب", "زیرا", "مشخص", "می\u200cتوانند", "نزدیک", "جریان", "روند", "بنابراین", "می\u200cدهند", "یافت", "نخستین", "بالا", "پنج", "ریزی", "عالی", "چیزی", "نخست", "بیشتری", "ترتیب", "شده_بود", "خاص", "خوبی", "خوب", "شروع", "فرد", "کامل", "غیر", "می\u200cرود", "دهند", "آخرین", "دادن", "جدی", "بهترین", "شامل", "گیرد", "بخشی", "باشند", "تمامی", "بهتر", "داده_است", "حد", "نبود", "کسانی", "می\u200cکرد", "داریم", "علیه", "می\u200cباشد", "دانست", "ناشی", "داشتند", "دهه", "می\u200cشد", "ایشان", "آنجا", "گرفته_است", "دچار", "می\u200cآید", "لحاظ", "آنکه", "داده", "بعضی", "هستیم", "اند", "برداری", "نباید", "می\u200cکنیم", "نشست", "سهم", "همیشه", "آمد", "اش", "وگو", "می\u200cکنم", "حداقل", "طبق", "جا", "خواهد_کرد", "نوعی", "چگونه", "رفت", "هنگام", "فوق", "روش", "ندارند", "سعی", "بندی", "شمار", "کلی", "کافی", "مواجه", "همچنان", "زیاد", "سمت", "کوچک", "داشته_است", "چیز", "پشت", "آورد", "حالا", "روبه", "سال\u200cهای", "دادند", "می\u200cکردند", "عهده", "نیمه", "جایی", "دیگران", "سی", "بروز", "یکدیگر", "آمده_است", "جز", "کنم", "سپس", "کنندگان", "خودش", "همواره", "یافته", "شان", "صرف", "نمی\u200cشود", "رسیدن", "چهارم", "یابد", "متر", "ساز", "داشته", "کرده_بود", "باره", "نحوه", "کردم", "تو", "شخصی", "داشته_باشند", "محسوب", "پخش", "کمی", "متفاوت", "سراسر", "کاملا", "داشتن", "نظیر", "آمده", "گروهی", "فردی", "ع", "همچون", "خطر", "خویش", "کدام", "دسته", "سبب", "عین", "آوری", "متاسفانه", "بیرون", "دار", "ابتدا", "شش", "افرادی", "می\u200cگویند", "سالهای", "درون", "نیستند", "یافته_است", "پر", "خاطرنشان", "گاه", "جمعی", "اغلب", "دوباره", "می\u200cیابد", "لذا", "زاده", "گردد", "اینجا"]
stopwords = set(english_stopwords + persian_stopwords + ['nan', 'NAN', 'Nan', 'None'])

In [15]:
import gensim
from gensim.models import Word2Vec
from gensim.models import KeyedVectors
import numpy as np

w2v_path = '../../qasem/utils/w2v_model/w2v/w2v_model_latest'
w2v = gensim.models.KeyedVectors.load(w2v_path)

w2v = w2v.wv

EMB_DIM = 100

from hazm import Normalizer, stopwords_list
import re

normalizer = Normalizer()

def clean_text(text, stopwords=stopwords_list()):
    print("hi")
    !pip install gensim
    !pip install hazm
    
    from hazm import Normalizer, stopwords_list
    import re
    import gensim
    if text is None:
        return None
    if type(text) is not str:
        return ''
    text = re.sub(r"\W+|[0-9]|[۰-۹]", ' ', str(text))
    text = normalizer.normalize(text)
    text_split = text.split()

    text_split = ' '.join([t.lower() for t in text_split
                  if (t and t not in stopwords)])
    return text_split



## url text emb

In [16]:
def embed_query(query, query_type='plain_text'):
    import gensim
    from gensim.models import Word2Vec
    from gensim.models import KeyedVectors
    import numpy as np

    if query is None:
        return None
    
    # LOOK HOW THEY ARE SEPARATED
    
    if query_type == 'campaign':
        pass
    else:
        query = query.split(' - ')
    
    query_embedding = np.zeros(EMB_DIM)
    for w in query:
        w_emb = np.mean([w2v[wo] for wo in w.split() if (wo in w2v.index_to_key)], axis=0)
        if not np.isnan(w_emb).any():
            query_embedding += w_emb
    query_embedding /= len(query)
    return query_embedding.tolist()
#     return [np.float16(x) for x in query_embedding.tolist()]

## Campaign Emb

In [19]:
campaign_kw_agg['emb'] = campaign_kw_agg.KWs.apply(embed_query, query_type='campaign')

  return _methods._mean(a, axis=axis, dtype=dtype,


KeyboardInterrupt: 

In [None]:
campaign_kw_agg

# Imp Data

In [None]:
df_imp = sc.read.parquet('hdfs://hdfs.hdfs/datalake/factimpression/test_04/factimpression/')

In [5]:
df_imp.count()

3455547

In [8]:
dff = df_imp.sample(fraction=0.01, seed=3)

In [10]:
dff.cache()
dff = dff.toPandas()

KeyboardInterrupt: 

In [None]:
dff.shape

In [24]:
df_imp.head()

Row(impression_id='CLa9UKph1qSJHaQLE_G-kyoZmbi0L_WrsanMhUHIyrw', impression_datetime=datetime.datetime(2021, 8, 1, 5, 33, 44), global_token='09aec082-298e3-362a1-1fef1-101a9a3681f5c', page_domain='www.tasnimnews.com', page_path='/fa/news/1393/08/30/552239/%D8%B3%D9%81%D8%B1%D9%87-%D8%A7%DB%8C-%D8%A8%D9%87-%D9%88%D8%B3%D8%B9%D8%AA-%D8%AD%D8%AC-%D9%81%D9%82%D8%B1%D8%A7-%D8%AF%D9%84%DB%8C-%DA%A9%D9%87-%D8%A8%D8%B1%D8%A7%DB%8C-%DB%8C%DA%A9-%D8%BA%D8%B0%D8%A7%DB%8C-%D8%AD%D8%B6%D8%B1%D8%AA-%D8%B9-%D9%85%DB%8C-%D8%AA%D9%BE%D8%AF', ad_id=297485, campaign_id=71070, advertiser_id=7170, position_id=14504, publisher_id=209, ssp='native', dsp='native', device='desktop', ip='80.191.203.92', bid=1, trueImpression_datetime=None, click_datetime=None, click_device=None, server_id=None, is_trueImpression=0, is_clicked=0, impression_date=datetime.date(2021, 8, 1))

In [None]:
df_event = sc.read.parquet(*['hdfs://hdfs.hdfs/data/etl/analytics/events/app.analytics.ua.events.v1/year=2021/month=08/day=%d' % day for day in range(17,23)])

In [17]:
df_scrapper = sc.read.parquet('hdfs://hdfs.hdfs/data/etl/pg/scrapper/pg.scrapper.url-content/')
df_scrapper = df_scrapper.select('id', 'url', 'title', 'description', 'domain')
df_scrapper = df_scrapper.withColumn('text',
    concat(col('title'), lit(" "), col('description'))).drop('title', 'description')

In [18]:
df_scrapper.head(1)

[Row(id=19199789, url='khabarvarzeshi.com/news/323038/%D8%AA%D9%88%D8%B5%DB%8C%D9%81-%DB%8C%DA%A9-%D8%B3%D8%AA%D8%A7%D8%B1%D9%87-%D8%A7%D8%B2-%D9%85%D8%B1%DA%AF-%D9%88-%D8%A2%D9%88%D8%A7%D8%B1%DA%AF%DB%8C-%D9%85%D8%B1%D8%AF%D9%85-%D8%AA%D8%B5%D9%88%D8%B1-%D9%86%D9%85%DB%8C-%DA%A9%D8%B1%D8%AF%D9%85-%DA%86%D9%86%DB%8C%D9%86-%D9%81%D8%A7%D8%AC%D8%B9%D9%87-%D8%A7%DB%8C', domain='khabarvarzeshi.com', text='توصیف یک ستاره از مرگ و آوارگی مردم تصور نمی کردم چنین فاجعه ای در کشورم رخ دهد کای هاورتس ستاره آلمانی چلسی از خرابی هایی که سیل در آلمان و در شهر زادگاه او ایجاد کرده حرف زده است ')]

In [19]:
from pyspark.sql.types import *

In [20]:
from pyspark.sql.functions import udf

udf_clean = udf(clean_text, StringType()) 

In [21]:
df_scrapper = df_scrapper.withColumn('cleaned', udf_clean('text'))

In [22]:
df_scrapper.head()

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 588, in main
    func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 447, in read_udfs
    udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i))
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 249, in read_single_udf
    f, return_type = read_command(pickleSer, infile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 69, in read_command
    command = serializer._read_with_length(file)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 160, in _read_with_length
    return self.loads(obj)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 430, in loads
    return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'hazm'


In [1]:
!which python

/opt/conda/bin/python


## URL embedding

In [None]:
udf_emb = udf(embed_query, ArrayType(FloatType())) 

In [None]:
df_scrapper = df_scrapper.withColumn('emb', udf_emb('text'))

In [None]:
df_scrapper.head(1)

## Merge Everything

In [None]:
udf_url = udf(
    (lambda x,y: x+y),
    StringType()) 

In [None]:
df_imp = df_imp.withColumn('url', udf_url('page_domain', 'page_path'))

In [None]:
df_imp.head(1)

In [None]:
df_imp = df_imp.select('url', 'campaign_id', 'is_clicked', 'is_trueImpression')

In [None]:
df_imp.head()

In [None]:
len(final_df)

In [None]:
final_df = df_imp.join(df_scrapper, on='url')

In [None]:
df_imp.count()

In [None]:
final_df.write.format("parquet").save("hdfs://hdfs.hdfs:/qasem_data/imp_clk_semantic/imp_agg_data")