In [1]:
#####################################################################################
# Import Libraries
#####################################################################################
from collections import defaultdict
#import seaborn as sns
#import matplotlib.pyplot as plt
import json
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkConf,SparkContext
from pyspark.sql import SQLContext,functions as F
from pyspark.sql.functions import col

In [2]:

import time
def get_epochs(val):
    return int(time.mktime(time.strptime(val, '%Y%m%d%H%M%S')))



def set_spark_session():
    try:
        spark = SparkSession \
                .builder \
                .appName(P_APP_NAME) \
                .config('spark.driver.memory', P_DRIVER_MEMORY) \
                .config('spark.executor.memory', P_EXECUTOR_MEMORY) \
                .config('spark.executor.cores',P_EXECUTOR_CORES) \
                .config('spark.mongodb.input.uri', P_INPUT_HOST_NAME) \
                .config('spark.mongodb.output.uri', P_OUTPUT_HOST_NAME) \
                .config('spark.sql.pivotMaxValues', P_PIVOT_MAX_VALUES) \
                .config('spark.jars.packages', P_MONGO_SPARK_CONNECTOR) \
                .config('spark.sql.inMemoryColumnarStorage.compressed',True) \
                .config('spark.sql.broadcastTimeout',"36000") \
                .getOrCreate()
 
    except Exception as e:
        #print("Error in SparkSession  -  "+  str(e))
        logging.exception("EXCEPTION -  Setting SparkSession")
        raise e    
        
    # Set SQL Context
    sqlContext=SQLContext(spark)

    return sqlContext,spark


def load_user_event_logs(spark):
    
    
    #Anonymous logs
    df_a = spark.read.format("json").load(P_EVENT_LOGS_A).filter( col("propertyId").isNotNull() & 
                                                             col("userId").isNotNull()
                                                    )     
    #Non-Anonymous logs
    df_na = spark.read.format("json").load(P_EVENT_LOGS_NA).filter( col("propertyId").isNotNull() & 
                                                             col("userId").isNotNull()
                                                    )    
    
    
    #merge both datasets
    df_a.registerTempTable("df_tbl")
    df_na.registerTempTable("df_na_tbl")
    
    df_union=spark.sql('''
    select userId as USER_ID, propertyId as ITEM_ID, replace(propertyCountry,'Canada','CA') as COUNTRY, get_epochs(eventTimestamp) as TIMESTAMP from df_tbl
    UNION ALL
    select NVL(NVL(contactId.Id, userId),1) as USER_ID, propertyId as ITEM_ID, replace(propertyCountry,'Canada','CA') as COUNTRY, get_epochs(eventTimestamp) as TIMESTAMP from df_na_tbl
    ''')
    return df_union

   


In [3]:
 def filter_inactive_properties(df):
        
        from pymongo import MongoClient
        from bson.objectid import ObjectId
        import pandas as pd
        
        #g_input="mongodb://10.0.1.70"
        #g_input="mongodb://3.210.155.32"
        
        
        
        client = MongoClient(P_INPUT_MONGO_HOST,P_INPUT_MONGO_PORT)
        
        #client = MongoClient('mongodb://localhost:27017')


        db = client.backend_production

        df_pd = pd.DataFrame(columns=['propertyId'])
        
        l_cnt=0
        
        for i in df.select('item_id').distinct().collect():
            #cnt= db.properties.find( {'mls_number': 'W4505836', 'status': 'active'}).count()
            cnt= db.properties.find( {'_id': ObjectId(i[0]), 'status': 'active'}).count()
            if cnt == 0:
                l_cnt=l_cnt+1
                df_pd.loc[l_cnt] = [i[0]]
        
        #print(df_pd.shape)
        spark_df = sqlContext.createDataFrame(df_pd) 
        
        # Create tables
        spark_df.registerTempTable("spark_df_tbl")
        df.registerTempTable("df_tbl")
        
        df_active=spark.sql('''
                            select * from df_tbl a where 1=1 
                            and not exists 
                                    (select 1 from spark_df_tbl b 
                                     where a.item_id=b.propertyId)

                            ''')
        df_filter_active=df_active.select('user_id', 'item_id', 'country', 'timestamp').distinct()

        
        return df_filter_active 
  

In [4]:
  
def events_popularity_score(df):
        
        df.registerTempTable("df_tbl")
        
        df_popularity=spark.sql('''
                        select item_id,country, count(distinct USER_ID) as count_score
                        from df_tbl group by 1,2 order by 3 desc
                        ''')
        
        '''
        df_popularity=df_popularity.coalesce(1)
        from pyspark.sql.functions import monotonically_increasing_id
        df_popularity=df_popularity.withColumn('score', monotonically_increasing_id())
        '''
        
        from pyspark.sql import Window
        import pyspark.sql.functions as psf

        cnt_score = Window.orderBy(psf.desc("count_score"))

        df_popularity_sc = df_popularity.withColumn("score", 
                            psf.dense_rank().over(cnt_score)
                            )
        
        return df_popularity_sc

In [5]:
def update_score_in_mongo(df):
    
    #df=df.select('_id','score')
    
    import pyspark.sql.functions as sfunc
    from pyspark.sql.types import StructType
    

   
    udf_struct_id = sfunc.udf(
        lambda x: tuple((str(x),)), 
        StructType([StructField("oid",  StringType(), True)])
    )

        
    df = df.withColumn('_id', udf_struct_id('_id'))
    

    
    df.write.format("com.mongodb.spark.sql.DefaultSource")\
            .mode("append") \
            .option("database","backend_production")\
            .option("collection", "properties")\
            .option("replaceDocument", "false")\
            .save()
    #Above will always append new fields to the existing records and will not change/touch existing fields
    #use this to update score in mongo for popular matrix


In [6]:
def find_event_items_similarities(df, df_ca):
    df.registerTempTable("df_tbl")
    df_ca.registerTempTable("df_ca_tbl")
    
    df_r=spark.sql('''
        select df_ca.id1 as _id, min((df.score + df_ca.score)) as score
        from df_tbl df, 
             df_ca_tbl df_ca
        where df.item_id=  df_ca.id2
        group by 1
    ''')
    
    #df_r.cache()
    
    return df_r
    
    

In [7]:
def normalize_score(df):
    
    from pyspark.ml.linalg import Vectors
    from pyspark.ml.feature import VectorAssembler
    from pyspark.ml.feature import StandardScaler


    assembler = VectorAssembler(
        inputCols=["score"],
        outputCol="score_v")

    output = assembler.transform(df)
    
    # Normalize each Vector using $L^1$ norm.
    
    scaler = StandardScaler(inputCol="score_v", outputCol="popularity_score",
                        withStd=False, withMean=True)

    # Compute summary statistics by fitting the StandardScaler
    scalerModel = scaler.fit(output)

    # Normalize each feature to have unit standard deviation.
    scaledData = scalerModel.transform(output)
    
    return scaledData

In [7]:
def get_similar_properties(df_similar_items, item_id):
    return df_similar_items.filter(col('_id')==item_id )
    

In [9]:
'''
from nest_constants import *
from nest_similar_prop_main import *
if __name__ == "__main__":
    
    sqlContext,spark=load_spark()
    
    spark.udf.register("ttime", ttime) 
    
    df_union=load_events(spark)
    print("Total number of user events", df_union.count())
    
    df_active=load_active_properties(df_union)
    print("Total number of user events having active items", df_active.count())
    
    df=popularity(df_active)
    print("popularity done")
    
    df_ca,df_us=similar_properties(spark,sqlContext)
    
    print("similarity done")
    df_ca.show(10, False)
    
    df_ca_final=event_similarities(df,df_ca)
    print("adding similarity score to popularity done")
    df_ca_final.cache()
    
    #df_ca_final.show(10, False)
    update_mongo_score(df_us_final)
    #df_ca=similarity_main_ca()
    #df.show(100, False)
'''

'\nfrom nest_constants import *\nfrom nest_similar_prop_main import *\nif __name__ == "__main__":\n    \n    sqlContext,spark=load_spark()\n    \n    spark.udf.register("ttime", ttime) \n    \n    df_union=load_events(spark)\n    print("Total number of user events", df_union.count())\n    \n    df_active=load_active_properties(df_union)\n    print("Total number of user events having active items", df_active.count())\n    \n    df=popularity(df_active)\n    print("popularity done")\n    \n    df_ca,df_us=similar_properties(spark,sqlContext)\n    \n    print("similarity done")\n    df_ca.show(10, False)\n    \n    df_ca_final=event_similarities(df,df_ca)\n    print("adding similarity score to popularity done")\n    df_ca_final.cache()\n    \n    #df_ca_final.show(10, False)\n    update_mongo_score(df_us_final)\n    #df_ca=similarity_main_ca()\n    #df.show(100, False)\n'

In [8]:
#db.properties.find( {'_id': ObjectId('5c7fb54364ad71000120c25f'), 'status': 'active'}).count()

In [8]:
from nest_constants import *
from nest_similar_prop_main import *
#if __name__ == "__main__":
    
sqlContext,spark=set_spark_session()
spark.udf.register("get_epochs", get_epochs)     

df_union=load_user_event_logs(spark)
print("Total number of user events", df_union.count())




Total number of user events 29534


In [9]:
   
df_active=filter_inactive_properties(df_union)
print("Total number of user events having active items", df_active.count())
    



Total number of user events having active items 4864


In [11]:
df_active.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- item_id: string (nullable = true)
 |-- country: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [26]:
# For STATS only

df_active.registerTempTable("df_active_tbl")
spark.sql('''
select country, count(distinct item_id ) from df_active_tbl group by 1
''').show(10,False)


+-------+-----------------------+
|country|count(DISTINCT item_id)|
+-------+-----------------------+
|null   |31                     |
|CA     |658                    |
|US     |1603                   |
+-------+-----------------------+



In [12]:
df=events_popularity_score(df_active)
print("popularity done")


popularity done


In [13]:
df.printSchema()

root
 |-- item_id: string (nullable = true)
 |-- country: string (nullable = true)
 |-- count_score: long (nullable = false)
 |-- score: integer (nullable = true)



In [32]:
df.count()

2292

In [14]:
ca_data= df.filter(col('COUNTRY')=='CA')
us_data= df.filter(col('COUNTRY')=='US')


In [14]:
ca_data.printSchema()

root
 |-- item_id: string (nullable = true)
 |-- country: string (nullable = true)
 |-- count_score: long (nullable = false)
 |-- score: integer (nullable = true)



In [26]:
ca_data.show(10, False)

+------------------------+-------+-----------+-----+
|item_id                 |country|count_score|score|
+------------------------+-------+-----------+-----+
|5d1faf9f650b160001617b1f|CA     |9          |4    |
|5c8a0f9abe7c7e00011c3803|CA     |7          |5    |
|5d1faf43650b160001617b12|CA     |6          |6    |
|5cbaefaa650b16000152c3c5|CA     |5          |7    |
|5cc448d1650b160001552358|CA     |5          |7    |
|5cf54a1e650b1600015bfa01|CA     |4          |8    |
|5d1d4488650b160001617628|CA     |4          |8    |
|5d1faf89650b160001617b1c|CA     |4          |8    |
|5d189e58650b160001616dc7|CA     |4          |8    |
|5d189800650b160001616a1e|CA     |3          |9    |
+------------------------+-------+-----------+-----+
only showing top 10 rows



In [15]:
#this is for ca popularity score for popularity personalization
# this is for ca user personalization

df_ca_similar_items, df_us_similar_items= similar_properties(spark,
                                        sqlContext,
                                        ca_data,
                                        us_data
                                        )
print("similarity done")
#df_ca.show(10, False)
    
#CA took  209.25046396255493 (without clearCache() and table cache)
#CA took  926.3730092048645 (with clearCache() and df cache)
#CA took  328.2677409648895 (without clearCache() and df cache)
#CA took  372.06903076171875 (without clearCache() and df cache)
#CA took  311.6600856781006 (without clearCache() and df cache)
#CA took  367.44630002975464 (without clearCache() and table cache)
#CA took  916.6582391262054 (with clearCache() and table cache)

#Summary -
#cache table or dataframe works the same
#sqlContext.clearCache() slows down the things so should be removed

CA took  1014.8570041656494
similarity done


In [16]:
df_ca_similar_items.cache()
df_us_similar_items.cache()


DataFrame[id1: string, id2: string, similarity_score: double, popularity_score: int]

In [20]:
df_ca_similar_items.printSchema()

root
 |-- id1: string (nullable = true)
 |-- id2: string (nullable = true)
 |-- similarity_score: double (nullable = true)
 |-- popularity_score: integer (nullable = true)



In [19]:
df_ca_similar_items.show(10, False)

+------------------------+-------+
|_id                     |score  |
+------------------------+-------+
|5c7fb54364ad71000120c25f|10.0   |
|5d35b14d106d4e00011f3462|10.4074|
|5d343b8c106d4e000119aa57|10.4074|
|5cfef7c8650b1600015db429|10.4074|
|5d35c21b106d4e00011f5294|10.4074|
|5d019b1c650b1600015e6e27|11.0   |
|5d35b9f2106d4e00011f425e|11.1818|
|5d35b9d2106d4e00011f4228|11.1818|
|5d35b981106d4e00011f4134|11.1818|
|5d35b86f106d4e00011f3f0b|11.1818|
+------------------------+-------+
only showing top 10 rows



In [22]:
df_ca_similar_items.show(10, False)

+------------------------+------------------------+-------------------+----------------+
|id1                     |id2                     |similarity_score   |popularity_score|
+------------------------+------------------------+-------------------+----------------+
|5c7fb54364ad71000120c25f|5c7fb54364ad71000120c25f|0.0                |10              |
|5cfef7c8650b1600015db429|5c7fb54364ad71000120c25f|0.40740740740740744|10              |
|5d35b14d106d4e00011f3462|5c7fb54364ad71000120c25f|0.40740740740740744|10              |
|5d35c21b106d4e00011f5294|5c7fb54364ad71000120c25f|0.40740740740740744|10              |
|5d343b8c106d4e000119aa57|5c7fb54364ad71000120c25f|0.40740740740740744|10              |
|5d019b1c650b1600015e6e27|5d019b1c650b1600015e6e27|0.0                |11              |
|5d35bba6106d4e00011f45b1|5d019b1c650b1600015e6e27|0.18181818181818177|11              |
|5d35b86f106d4e00011f3f0b|5d019b1c650b1600015e6e27|0.18181818181818177|11              |
|5d35b981106d4e00011f

In [25]:
df_ca_similar_items.registerTempTable("df_ca_similar_items_tbl")

In [40]:
df_ca_mongo=spark.sql('''
select id1 as _id, min(similarity_score + popularity_score) as score
from df_ca_similar_items_tbl
group by 1

''')

In [37]:
df_ca_mongo.registerTempTable("df_ca_mongo_tbl")

In [38]:
spark.sql('''
select id1, count(1) from df_ca_mongo_tbl  group by 1 having count(1) > 1
''').show(100, False)

+---+--------+
|id1|count(1)|
+---+--------+
+---+--------+



In [33]:
spark.sql('''
select * from df_ca_mongo_tbl  where id1='5d355ac1106d4e00011cb906'
''').show(100, False)

+------------------------+------------------+
|id1                     |score             |
+------------------------+------------------+
|5d355ac1106d4e00011cb906|11.0              |
|5d355ac1106d4e00011cb906|11.25             |
|5d355ac1106d4e00011cb906|11.291666666666666|
|5d355ac1106d4e00011cb906|11.28             |
+------------------------+------------------+



In [25]:
#from pyspark.sql.functions import broadcast
#df_events_new = broadcast(spark.table("df_tbl")).join(spark.table("df_ca_tbl"), "item_id")

In [74]:
## Testing
#df=normalize_score(df_ca_final)

In [75]:
df.printSchema()

root
 |-- _id: string (nullable = true)
 |-- score: double (nullable = true)
 |-- score_v: vector (nullable = true)
 |-- popularity_score: vector (nullable = true)



In [77]:
df.show(10, False)

+------------------------+------------------+--------------------+---------------------+
|_id                     |score             |score_v             |popularity_score     |
+------------------------+------------------+--------------------+---------------------+
|5c7fb54364ad71000120c25f|10.0              |[10.0]              |[-0.9460964885829615]|
|5d019b1c650b1600015e6e27|11.0              |[11.0]              |[0.05390351141703853]|
|5d189a2f650b160001616b4b|11.0              |[11.0]              |[0.05390351141703853]|
|5d35b272106d4e00011f3675|11.217391304347826|[11.217391304347826]|[0.2712948157648647] |
|5bbe6d997c1e9000015bace0|11.461538461538462|[11.461538461538462]|[0.5154419729555002] |
|5d35b662106d4e00011f3b6d|11.227272727272727|[11.227272727272727]|[0.28117623868976516]|
|5bbf707ba6e5f20001a50fed|11.481481481481481|[11.481481481481481]|[0.5353849928985195] |
|5d12567a650b1600016011a8|11.217391304347826|[11.217391304347826]|[0.2712948157648647] |
|5cff02c3650b1600015d

In [None]:
# Testing Ends

In [41]:
update_score_in_mongo(df_ca_mongo)

In [61]:
print(p)

<function p at 0x7f8da0c4a400>


In [59]:
def p():
    return "hello"

In [None]:
## USER PERSONALIZATION LOGIC STARTS ## 

In [42]:
df_active.show(10, False)

+--------------------------------+------------------------+-------+----------+
|user_id                         |item_id                 |country|timestamp |
+--------------------------------+------------------------+-------+----------+
|987a9f8d9eed87796620f3d9c0dcf911|5d189605650b160001616942|CA     |1562975978|
|61e448f186b4f14151c0fa57811afca3|5ce09288517b8300016fc74c|US     |1561432929|
|7a5dbc9b61bb726ac416cc14e2492ac2|5cb501459e9933000187ab22|US     |1555527030|
|132e0f117786fc5979f24085d4705cf1|5d075ece650b1600015efb18|CA     |1563120276|
|9bd43e0b9ddadbdbcbc3ef52215b7745|5cfaf24d650b1600015d591c|CA     |1560154896|
|cd08ef3ba3aad8c5a7bdb7715d63afa7|5ca92d6e9e993300014c53d4|US     |1555009779|
|1490334b1e10b7bfaa2d1f1950a433f8|5cfbafcf95148a000175bff9|US     |1560115335|
|7d17411ea1fbac68fe0b66c61fd08af5|5cff0abf650b1600015dcf9c|CA     |1562822107|
|ba14c159f8ac28f3dabbb224ea8c1e67|5ce53fc6650b1600015775da|CA     |1563496016|
|3f296bc7a95a0bf5ca4f7e44f839044d|5c998f3e6b8d820001

In [49]:
def user_item_popularity(df):
    
    from pyspark.sql.window import Window
    from pyspark.sql.functions import rank, col
    
    df.registerTempTable("df_tbl")
    
    df_tmp=spark.sql('''
                select user_id, item_id, count(*) as count_score from df_tbl
                group by 1, 2 order by 1,3 desc
                ''')
    

    window = Window.partitionBy(df_tmp['user_id']).orderBy(df_tmp['count_score'].desc())

    return df_tmp.select('user_id','item_id', rank().over(window).alias('score'))


def user_item_similarities(df, df_similar):
    
    df.registerTempTable("df_user_events")
    df_similar.registerTempTable("df_similar_items")
    
    return spark.sql('''
        select df.user_id, df_similar.id1 as _id, min((df.score + df_similar.similarity_score)) as score
        from df_user_events df, 
             df_similar_items df_similar
        where df.item_id=  df_similar.id2
        group by 1,2 order by 1,3 asc
    ''')

In [44]:
a=user_item_popularity(df_active)

In [45]:
a.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- item_id: string (nullable = true)
 |-- score: integer (nullable = true)



In [46]:
a.show(100,False)

+--------------------------------+------------------------+-----+
|user_id                         |item_id                 |score|
+--------------------------------+------------------------+-----+
|0408ee02bab4726393d2dbb26adb9e5d|5d347fad106d4e00011b1c18|1    |
|1149d7e68f75b061ab0d829e537e8a92|5d0ff5f5d6402400015dad86|1    |
|55d03bb1d723f64e57f6144153b365cf|5d3095526cf18100012d8228|1    |
|051cc4acc2edf837c2d4bfb9ab4e2609|5c559c62040f8b000165f80f|1    |
|662bf29981476f4ed14bad4c7f67b4eb|5c9ae02368734500014b4a87|1    |
|662bf29981476f4ed14bad4c7f67b4eb|5c89855b0c69a3000151535f|2    |
|662bf29981476f4ed14bad4c7f67b4eb|5c9233aabe12b10001506e3b|2    |
|662bf29981476f4ed14bad4c7f67b4eb|5c5ba4cfd4c80c000105b167|2    |
|662bf29981476f4ed14bad4c7f67b4eb|5c5198cad3f73c00012d6640|2    |
|746cbe5d9cdb44ce2d05cfa55e948c9d|5d33f52c80e0170001ec307d|1    |
|746cbe5d9cdb44ce2d05cfa55e948c9d|5d2b3cad943d490001e03c30|2    |
|746cbe5d9cdb44ce2d05cfa55e948c9d|5cb951299e99330001defed8|2    |
|746cbe5d9

In [50]:
b=user_item_similarities(a,df_ca_similar_items)

In [51]:
b.show(100,False)

+--------------------------------+------------------------+------------------+
|user_id                         |_id                     |score             |
+--------------------------------+------------------------+------------------+
|0031d33bcfa3e7e69b58fc27a2f36661|5d1faf9f650b160001617b1f|1.0               |
|0031d33bcfa3e7e69b58fc27a2f36661|5ce6f610650b16000158fa89|1.2173913043478262|
|0031d33bcfa3e7e69b58fc27a2f36661|5d367803106d4e00011f76ab|1.2173913043478262|
|0031d33bcfa3e7e69b58fc27a2f36661|5d35aa93106d4e00011f2aea|1.25              |
|0031d33bcfa3e7e69b58fc27a2f36661|5d076187650b1600015efcfb|1.28              |
|00345349a8d815080067497d50a4439c|5cd0a8ce650b1600015693b8|1.0               |
|00345349a8d815080067497d50a4439c|5cc4f163650b1600015538d9|1.4074074074074074|
|00345349a8d815080067497d50a4439c|5cc67a2f650b160001554c95|1.4074074074074074|
|00345349a8d815080067497d50a4439c|5d35ac9c106d4e00011f2cd8|1.4444444444444444|
|00345349a8d815080067497d50a4439c|5cfae621650b160001

In [52]:

df_save=b.select("user_id","_id","score")\
    .withColumn("Recommendations", F.struct(F.col("_id"), F.col("score")))\
    .select("user_id","Recommendations")\
    .groupby("user_id").agg(F.collect_list("Recommendations").alias("Recommendations"))  
    
    

In [53]:
df_save.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- Recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _id: string (nullable = true)
 |    |    |-- score: double (nullable = true)



In [54]:
df_save.repartition(1) \
        .write.format("json") \
        .mode("overwrite") \
        .option("header","true")\
        .save("userrr_2")


In [None]:
df_save.count()

In [4]:
df_a = spark.read.format("json").load(P_EVENT_LOGS_A)
    #Non-Anonymous logs
df_na = spark.read.format("json").load(P_EVENT_LOGS_NA)
    

In [9]:
df_a.printSchema()

root
 |-- contactId: string (nullable = true)
 |-- event: string (nullable = true)
 |-- eventCategory: string (nullable = true)
 |-- eventChannel: string (nullable = true)
 |-- eventComponent: string (nullable = true)
 |-- eventDesc: string (nullable = true)
 |-- eventPageType: string (nullable = true)
 |-- eventTimestamp: string (nullable = true)
 |-- label: string (nullable = true)
 |-- locationId: string (nullable = true)
 |-- locationName: string (nullable = true)
 |-- locationURL: string (nullable = true)
 |-- msgId: string (nullable = true)
 |-- partnerName: string (nullable = true)
 |-- propertyCity: string (nullable = true)
 |-- propertyCountry: string (nullable = true)
 |-- propertyId: string (nullable = true)
 |-- propertyProvince: string (nullable = true)
 |-- propertyReferrer: string (nullable = true)
 |-- propertyStatus: string (nullable = true)
 |-- propertyTitle: string (nullable = true)
 |-- propertyURL: string (nullable = true)
 |-- searchFilter: struct (nullable = tru

In [10]:
@

In [11]:
df_union.registerTempTable("df_union_tbl")

In [12]:
df_union.printSchema()

root
 |-- userId: string (nullable = true)
 |-- propertyId: string (nullable = true)
 |-- event: string (nullable = true)
 |-- eventChannel: string (nullable = true)



In [16]:
spark.sql('''
select event, eventChannel, count(1)
from df_union_tbl 
where eventChannel='BE'
group by 1,2
order by 3 desc

''').show(1000, False)

+----------------------+------------+--------+
|event                 |eventChannel|count(1)|
+----------------------+------------+--------+
|Save search           |BE          |16095   |
|Favorited a property  |BE          |1969    |
|Shared URL            |BE          |160     |
|Favorited a location  |BE          |101     |
|Property visit request|BE          |77      |
|Contact request       |BE          |75      |
|Request for Financing |BE          |74      |
|Sign up email         |BE          |4       |
+----------------------+------------+--------+



In [20]:
spark.sql('''
select event, eventChannel, count(1)
from df_union_tbl 
where eventChannel='FE'
and event not like 'Min%'
and event not like 'Bread%'
group by 1,2
order by 3 desc

''').show(1000, False)

+----------------------------------------+------------+--------+
|event                                   |eventChannel|count(1)|
+----------------------------------------+------------+--------+
|ThumbnailClicked                        |FE          |11821   |
|Home                                    |FE          |11494   |
|PhotoClicked                            |FE          |9560    |
|Select                                  |FE          |5293    |
|GoBackClicked                           |FE          |4368    |
|SupportCTAClicked                       |FE          |4137    |
|BankLogoClicked                         |FE          |3557    |
|CommuteClicked                          |FE          |2557    |
|SortingClicked                          |FE          |2439    |
|AlertClicked                            |FE          |2434    |
|Property                                |FE          |2279    |
|CalculateClicked                        |FE          |1712    |
|Favorited               

In [None]:
:contact_mortgage, 
:contact_brokerage, 
:property_schedule_visit, 
:filter_set, 
:share, 
:exit, 
:loc_signup, 
:favorite_property, 
:put_contact, 
:put_interest, 
:send_link
                                            

In [None]:
Share Search - "GetURLClicked"
Send Link - "LinkEmailed"

In [32]:
!pip install Image

