<a href="https://colab.research.google.com/github/pcbzmani/Learning-repository-for-scala-and-pyspark/blob/main/final_jobathon.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 56 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 74.8 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=7831b139028ab7e1813baecf470ed407e62c13856b27ddcfd610d818c8626042
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("jobathon")\
        .config('spark.ui.port', '4040')\
        .getOrCreate()

In [28]:
# Import libraries
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StringType

#Function input - spark object, click data path, resolved data path
# Function output - final spark dataframe# 
def sample_function(spark, s3_clickstream_path, s3_login_path):
  df_clickstream =  spark.read.format("json").load(s3_clickstream_path)
  user_mapping =  spark.read.format("csv").option("header",True).load(s3_login_path)

# Join clickstream with user mapping  
  df = df_clickstream.join(
    user_mapping,
    'session_id',
    'left_outer'
  ) 

# Derive Current date, Page URL and Logged in
  df = df.withColumn('client_page_url', F.col('client_side_data').getField("current_page_url"))\
       .withColumn('current_date',F.split('event_date_time',' ')[0])\
       .drop('client_side_data','login_date_time')

# Derive Click count and Page count for registered users
  df_grp = df.groupBy('current_date','browser_id','user_id')\
           .pivot('event_type').agg({'event_type':'count'})


# Join the event grouped DF with Original df  
  df = df.join(
    df_grp,
    [df['user_id'].eqNullSafe(df_grp['user_id']),df['current_date']==df_grp['current_date'],df['browser_id']==df_grp['browser_id']],
    'left_outer'
  ).drop(df_grp['user_id'])\
   .drop(df_grp['browser_id'])\
   .drop(df_grp['current_date'])
  
#Window Spec
  window_spec = Window\
              .partitionBy('current_date','browser_id','user_id')\
              .orderBy(F.col('event_date_time').asc())
  
  
  df = df.withColumn('logged_in',F.when(( (F.col('user_id').isNotNull()) & (F.col('pageload') > 0 )),F.lit(1))\
                               .otherwise(F.lit(0)))\
        .withColumn('row_number',F.row_number().over(window_spec))\
        .withColumn('number_of_pageloads',F.coalesce(F.col('pageload'),F.lit(0) )) \
        .withColumn('number_of_clicks',F.coalesce(F.col('click'),F.lit(0) ))\
        .filter('row_number == 1')\
        .select('current_date','browser_id','user_id','logged_in',F.col('client_page_url').alias('first_url'),\
                F.col('number_of_clicks'),\
              F.col('number_of_pageloads')) 
              
  return df

In [30]:
result_df = sample_function(spark,'/content/drive/MyDrive/JobaThon_Sep_2022/jobathon_click_data.json','/content/drive/MyDrive/JobaThon_Sep_2022/jobathon_login_data.csv')

In [31]:
result_df.printSchema()

root
 |-- current_date: string (nullable = true)
 |-- browser_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- logged_in: integer (nullable = false)
 |-- first_url: string (nullable = true)
 |-- number_of_clicks: long (nullable = false)
 |-- number_of_pageloads: long (nullable = false)



In [32]:
result_df.summary().show()

+-------+------------+--------------------+-------------+-------------------+--------------------+------------------+-------------------+
|summary|current_date|          browser_id|      user_id|          logged_in|           first_url|  number_of_clicks|number_of_pageloads|
+-------+------------+--------------------+-------------+-------------------+--------------------+------------------+-------------------+
|  count|      305456|              305456|        14393|             305456|              305456|            305456|             305456|
|   mean|        null|                null|     Infinity|0.03377573202032371|                null|1.9550573568697291|  0.860035487926248|
| stddev|        null|                null|          NaN|0.18065170574120537|                null| 6.626274849572005| 1.1758805155597627|
|    min|  2022-07-31|0009xtvcUjrhxlw2F...|   1000000406|                  0|https://www.gosho...|                 0|                  0|
|    25%|        null|            

In [33]:
result_df.show(truncate=False)

+------------+--------------------------------+-------+---------+-----------------------------------------------------------------------------------------+----------------+-------------------+
|current_date|browser_id                      |user_id|logged_in|first_url                                                                                |number_of_clicks|number_of_pageloads|
+------------+--------------------------------+-------+---------+-----------------------------------------------------------------------------------------+----------------+-------------------+
|2022-07-31  |00AFmUbwoEEAkn3x7QlmqWmEXeYAArGC|null   |0        |https://www.goshop.com/fZfjGgJF2EdN1hdLwCCbFmpBO                                         |1               |0                  |
|2022-07-31  |00LdZ46WwvtEzQXaosKx4IdAIdE3TJmB|null   |0        |https://www.goshop.com/6xgsGR1xsXJNc8mZOlH1lk1a                                          |0               |1                  |
|2022-07-31  |00ZYtPbQ1OlhEjGf4qUHl