<a href="https://colab.research.google.com/github/pcbzmani/PySpark-Examples/blob/main/final_jobathon.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 39 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 54.5 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=ab309e824cc5678edeb97df7347f7b0b2b05939612ab46f884828742cafc1fe4
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("jobathon")\
        .config('spark.ui.port', '4040')\
        .getOrCreate()

In [17]:
# Import libraries
import pyspark.sql.functions as F
from pyspark.sql.window import Window

# Function input - spark object, click data path, resolved data path
# Function output - final spark dataframe
def sample_function(spark, s3_clickstream_path, s3_login_path):
  df_clickstream =  spark.read.format("json").load(s3_clickstream_path)
  user_mapping =  spark.read.format("csv").option("header",True).load(s3_login_path)
  
  # Join Clickstream with user mapping to Identify the User already registered
  df = df_clickstream.join(
    user_mapping,
    'session_id',
    'left_outer'
  ) 

  # Sine the client_side_data is STRUCT, using getField to extract URL and timespend

  df = df.withColumn('client_page_url', F.col('client_side_data').getField("current_page_url"))\
       .withColumn('time_elapsed',F.col('client_side_data').getField("time_elapsed"))\
       .withColumn('current_date',F.split('event_date_time',' ')[0])\
       .drop('client_side_data')

  df = df.na.drop(how='any', subset=['user_id'])

  #Agg the clickstream for user on a particular date
  df_grp = df.groupBy('user_id','current_date','browser_id')\
           .pivot('event_type').agg({'event_type':'count'})
  
  df = df.join(
    df_grp,
    ['user_id','current_date','browser_id'],
    'left_outer'
  )
  

  #Window Spec
  window_spec = Window\
              .partitionBy('user_id','current_date','browser_id')\
              .orderBy(F.col('event_date_time').asc())

  df = df.withColumn('logged_in',F.when(F.col('pageload')>0,F.lit(1))\
                               .otherwise(F.lit('0')))\
        .withColumn('row_number',F.row_number().over(window_spec))\
       .filter('row_number == 1')\
       .withColumn('pageload',F.coalesce(F.col('pageload'),F.lit(0))) \
       .withColumn('click',F.coalesce(F.col('click'),F.lit(0))) \
       .select('current_date','browser_id','user_id','logged_in',F.col('client_page_url').alias('first_url'),\
                F.col('click').alias('number_of_clicks'), F.col('pageload').alias('number_of_pageloads'))
  
	
	# Return your final spark df
  return df

In [18]:
result_df = sample_function(spark,'/content/drive/MyDrive/JobaThon_Sep_2022/jobathon_click_data.json','/content/drive/MyDrive/JobaThon_Sep_2022/jobathon_login_data.csv')

In [7]:
result_df.summary().show()

+-------+------------+--------------------+-------------+------------------+--------------------+------------------+-------------------+
|summary|current_date|          browser_id|      user_id|         logged_in|           first_url|  number_of_clicks|number_of_pageloads|
+-------+------------+--------------------+-------------+------------------+--------------------+------------------+-------------------+
|  count|       14393|               14393|        14393|             14393|               14393|             14393|              14393|
|   mean|        null|                null|     Infinity|0.7295907732925728|                null| 5.417355659000903| 1.4812061418745224|
| stddev|        null|                null|          NaN|0.4441866555801319|                null|11.122998300836665| 1.8109505368604233|
|    min|  2022-07-31|0017kL71WJc4PlnUd...|   1000000406|                 0|https://www.gosho...|                 0|                  0|
|    25%|        null|                nul

In [19]:
result_df_user1= result_df.filter('user_id = "200000056E"')
result_df_user1.show(20,False)

+------------+--------------------------------+----------+---------+---------------------------------------------------------------+----------------+-------------------+
|current_date|browser_id                      |user_id   |logged_in|first_url                                                      |number_of_clicks|number_of_pageloads|
+------------+--------------------------------+----------+---------+---------------------------------------------------------------+----------------+-------------------+
|2022-08-05  |1wtjxX86BxOU9PTGWqV7BdDS51tKOJKq|200000056E|0        |https://www.goshop.com/VvOaxOwfDphojhOG7                       |1               |0                  |
|2022-08-05  |R3HlJQhj9gQcRsNOk8o1RkVAM3IryTky|200000056E|0        |https://www.goshop.com/qdfZSK3PZE2YMCHqqTNp                    |1               |0                  |
|2022-08-05  |WoxlTuHnBi3qkuZvdFPob1p2L2a7MSim|200000056E|0        |https://www.goshop.com/qdfZSK3PZE2YMCHqqTNp                    |1               |0

In [9]:
result_df.createOrReplaceTempView('resultdf')

In [14]:
brw = spark.sql("""
select user_id,current_date, count(*) from resultdf
group by user_id,current_date
having count(*) > 1
""")

In [15]:
brw.show(500,False)

+----------+------------+--------+
|user_id   |current_date|count(1)|
+----------+------------+--------+
|100000049A|2022-08-03  |2       |
|100000049A|2022-08-09  |2       |
|1000000542|2022-08-07  |2       |
|1000000572|2022-08-02  |2       |
|1000000591|2022-08-03  |2       |
|1000000596|2022-08-06  |2       |
|10000005AE|2022-08-04  |2       |
|10000005C3|2022-08-05  |2       |
|10000005D7|2022-08-08  |2       |
|1000000607|2022-08-01  |2       |
|100000062E|2022-08-08  |2       |
|1000000667|2022-08-01  |2       |
|1000000673|2022-08-10  |2       |
|100000067A|2022-08-09  |2       |
|10000006BE|2022-08-05  |2       |
|10000006BE|2022-08-07  |2       |
|10000006C6|2022-08-05  |2       |
|10000006EF|2022-08-02  |2       |
|12000003EF|2022-08-04  |2       |
|12000003F6|2022-08-07  |2       |
|1200000416|2022-08-01  |2       |
|1200000422|2022-08-04  |2       |
|120000042F|2022-08-04  |2       |
|1200000474|2022-08-07  |2       |
|12000004D1|2022-08-03  |13      |
|12000004D6|2022-08-