In [None]:
from scipy import spatial
import numpy as np
from numpy.linalg import norm
from pyspark.sql.types import *
from pyspark.sql.functions import udf
from pyspark.sql import functions as F
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.rdd import RDD
# #from pyspark.sql import SQLContext, HiveContext
# from pyspark.sql.functions import broadcast
# from pyspark.sql.functions import lit, rand, concat
# from pyspark.sql.column import Column, _to_java_column, _to_seq
import subprocess
import argparse

In [None]:
spark = SparkSession.builder \
                .config("spark.sql.crossJoin.enabled","true") \
                .config("spark.driver.memory", "8g") \
                .config("spark.executor.memory", "10g") \
                .config("spark.memory.fraction", "0.35") \
                .getOrCreate()

In [None]:
spark

In [None]:
today = 20200601

viewing_data = spark.sql("""
                select p.profile_id, case when vertical_group = '--' and vertical_category = 'Anime' then 'Anime'
                                    when vertical_group = '--' then 'Other'
                                    else vertical_group end as vertical,
                        sum(view_secs) as vert_view_secs
                from dse.vrt_show_vertical_r v
                join dse.figment_profile_title_viewing p on p.show_title_id = v.show_title_id
                and p.dateint >= {0}
                and v.vertical_order_nbr = 1
                and p.playback_country_iso_code = 'DE' 
                group by 1,2 
                having vert_view_secs > 1000 limit 10000
            """.format(today))

viewing_data.registerTempTable("viewing_data")

spark.sql("CREATE TABLE rmatai.profile_viewing_data_DE AS SELECT * from viewing_data")


In [None]:
df = spark.table("rmatai.profile_viewing_data_FR_new1")
verticals_pivot = df.groupBy("profile_id")\
                           .pivot("vertical")\
                           .agg(F.sum("vert_view_secs"))

verticals_pivot1 = verticals_pivot.na.fill(0)
verticals_pivot1.registerTempTable("verticals_pivot1")
spark.sql("CREATE TABLE rmatai.profile_viewing_pivot_FR_new2 AS SELECT * from verticals_pivot1")
verticals_pivot1.show(20, False)


In [None]:
sample_data = spark.table("rmatai.profile_viewing_pivot_FR_new2")

sample_data1 = sample_data
for col in sample_data.columns:
  sample_data1 = sample_data1.withColumnRenamed(col,col.replace(" ", "_"))

sample_data2 = sample_data1
for col in sample_data1.columns:
  sample_data2 = sample_data2.withColumnRenamed(col,col.replace("&", "")).withColumnRenamed(col,col.replace("(", ""))

sample_data3 = sample_data2
for col in sample_data2.columns:
  sample_data3 = sample_data3.withColumnRenamed(col,col.replace("-", "")).withColumnRenamed(col,col.replace("/", ""))

sample_data4 = sample_data3
for col in sample_data3.columns:
  sample_data4 = sample_data4.withColumnRenamed(col,col.replace(")", ""))


In [None]:
sample_data4.registerTempTable("sample_data4")

spark.sql("CREATE TABLE rmatai.profile_viewing_pivot_FR_new3 as select * from sample_data4")

In [None]:
sample_data4.col_list[1:]

In [None]:
df1 = spark.table("rmatai.profile_viewing_pivot_FR_new3")

view_vec = df1.withColumn("view_list", F.array(col_list[1:])).select("profile_id", "view_list")
view_vec.registerTempTable("view_vec")
spark.sql("CREATE TABLE rmatai.profile_viewing_vec_FR as select * from view_vec")

In [None]:
spark.sql("CREATE TABLE rmatai.profile_viewing_matrix_FR_new as \
select a.profile_id, a.view_list , b.profile_id as connected_profile_id, b.view_list connected_view_list \
from rmatai.profile_viewing_vec_FR a \
full outer join rmatai.profile_viewing_vec_FR b \
on a.profile_id != b.profile_id")



In [None]:
#####Compute Cosine similarity between two profile's viewing vectors
@udf("float")
def cosine_udf(l, l2):
    return float(1-spatial.distance.cosine(l, l2))

df = spark.table("rmatai.profile_viewing_matrix_FR_new")
#df = spark.sql("""select * from rmatai.profile_viewing_matrix_DE_new limit 10000""")
result1 = df.select(df.profile_id, df.connected_profile_id, cosine_udf(df.view_list, df.connected_view_list).alias("cos_sim"))
#result1.show(20, False)
result1.registerTempTable("result1")
spark.sql("CREATE TABLE rmatai.profile_viewing_cosine_FR_new as select * from result1")

# spark.sql("CREATE TABLE rmatai.profile_viewing_cosine_DE as \
# select a.profile_id, a.connected_profile_id, cosine_udf(a.view_list, a.connected_view_list) \
# from rmatai.profile_viewing_matrix_DE a")


In [None]:
# Import the libraries
import matplotlib.pyplot as plt
import seaborn as sns

# matplotlib histogram
plt.hist(result1['cos_sim'], color = 'blue', edgecolor = 'black',
         bins = int(20))

# seaborn histogram
sns.distplot(result1['cos_sim'], hist=True, kde=False, 
             bins=int(180/5), color = 'blue',
             hist_kws={'edgecolor':'black'})
# Add labels
plt.title('Distribution of Cosine Similarity')
plt.xlabel('Cosine Similarity')
plt.ylabel('Number of Edges')

In [None]:
spark.sql("CREATE TABLE rmatai.profile_adjlist as \
select t1.profile_id, t2.profile_id \
from rmatai.profile_verticals_viewing_sample t1 \
inner join rmatai.profile_verticals_viewing_sample t2\
on t1.vertical = t2.vertical\
where t1.profile_id < t2.profile_id\
and t1.vert_view_secs > 2000 and t2.vert_view_secs > 2000")

In [None]:
from scipy import spatial
import numpy as np
from numpy.linalg import norm
from pyspark.sql.functions import udf
from pyspark.sql.types import *

from pyspark.sql.functions import udf
@udf("float")
def cosine_udf(l, l2):
    return float(1-spatial.distance.cosine(l, l2))

df = spark.table("rmatai.profile_matrix_sample_FR2")

result1 = df.select(df.profile_id, df.connected_profile_id, cosine_udf(df.view_list, df.connected_view_list))
result1.show(20, False)

