In [None]:
# awi spawn emr instance -- python code in s3://python_code.py -master-node  m5.2xlarge -worker-node m5.xlarge -worker-node 10

In [None]:
!pip install pyspark py4j



In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as sf

In [None]:
spark = SparkSession.builder.appName('SparkByExamples.com').master("local[*]").getOrCreate()


# Reading Data

In [None]:
df = spark.read.csv('click_stream.csv', header=True, inferSchema=True)

In [None]:
df.printSchema()

root
 |-- hotel_id: long (nullable = true)
 |-- uuid: string (nullable = true)
 |-- profile_type: string (nullable = true)
 |-- device_id: string (nullable = true)
 |-- visitor_id: string (nullable = true)
 |-- user_mcid: string (nullable = true)
 |-- page_name: string (nullable = true)
 |-- meta_activity_name: string (nullable = true)
 |-- meta_act_cntnt: long (nullable = true)
 |-- meta_timezone: string (nullable = true)
 |-- meta_user_agent: string (nullable = true)
 |-- lob_name: string (nullable = true)
 |-- pg_nm_omniture: string (nullable = true)
 |-- device_type: string (nullable = true)
 |-- dvc_g_city: string (nullable = true)
 |-- os: string (nullable = true)
 |-- os_version: string (nullable = true)
 |-- app_version: string (nullable = true)
 |-- query_city: string (nullable = true)
 |-- query_country: string (nullable = true)
 |-- query_travel_purpose_optd: string (nullable = true)
 |-- travel_purpose: string (nullable = true)
 |-- query_type: string (nullable = true)
 |--

In [None]:
df.show(1)

+------------------+-----------+------------+----------------+--------------------+--------------------+---------+------------------+--------------+-------------------+---------------+--------+--------------+-----------+-----------+-------+----------+-----------+----------+-------------+-------------------------+--------------+----------+-----------+------------+-------------+------------+--------------+------------+-----------------+-----------+----+-------------+--------------+-------------+-----------------+------------+------------------+------+-----------------+----------------+----------------+---------+---------+---------+-----------------+-------------------+--------------+-------------+--------------+-----------------------+------------------------+-----------+----------------+----------------------+----------------+--------------+-----------+----------+------------------+-------------------+--------------------+---------------+-----------+-----------+------------------+------

In [None]:
df.count()

11715

# Let's get more detailed info

In [None]:
df.describe().show()

+-------+--------------------+-----------+------------+--------------------+--------------------+--------------------+---------------+--------------------+--------------------+-------------+-----------------+--------+--------------------+-----------+----------+-------+------------------+------------------+----------+-------------+-------------------------+--------------+----------+--------------------+---------------+-----------------+-------------------+------------------+------------------+------------------+-----------------+-------------+------------------+-----------------+-----------------+------------------+------------------+------+------------------+-----------------+-----------------+---------+---------+---------+-----------------+-------------------+--------------+-------------+--------------+-----------------------+------------------------+-----------+----------------+----------------------+----------------+--------------+-----------+----------+------------------+----------

In [None]:
df.columns[:10]

['hotel_id',
 'uuid',
 'profile_type',
 'device_id',
 'visitor_id',
 'user_mcid',
 'page_name',
 'meta_activity_name',
 'meta_act_cntnt',
 'meta_timezone']

# How to select rows?

In [None]:
df.select('htl_city_code', 'country_code').where(df.htl_city_code.isNotNull() & df.country_code.isNotNull()).show()


+-------------+------------+
|htl_city_code|country_code|
+-------------+------------+
|  high_to_low|       false|
|  high_to_low|       false|
|  high_to_low|       false|
+-------------+------------+



In [None]:
df.createOrReplaceTempView("click_stream")
spark.sql("select htl_city_code, country_code from click_stream where country_code is not NULL").show()

+-------------+------------+
|htl_city_code|country_code|
+-------------+------------+
|  high_to_low|       false|
|  high_to_low|       false|
|  high_to_low|       false|
+-------------+------------+



# Missing Values

In [None]:
# df = df.dropna()

In [None]:
null_counts = df.select([sf.count(sf.when(sf.col(c).isNull(), c)).alias(c) for c in df.columns])

total_rows = df.count()
# Calculating percentage of nulls for each column
null_percentage = null_counts.select([(sf.col(c) / total_rows * 100).alias(c) for c in null_counts.columns])
null_percentage.show()

+--------+------------------+------------+------------------+-----------------+-----------------+---------+------------------+-----------------+-------------------+-----------------+--------+-----------------+-----------+------------------+---+------------------+-----------+----------+-------------+-------------------------+------------------+------------------+-----------------+------------+-------------+------------+-----------------+------------+------------------+-----------+---+-----------------+------------------+-------------+-----------------+-----------------+------------------+------+------------------+------------------+------------------+-----------------+---------+---------+-----------------+-------------------+--------------+-------------+--------------+-----------------------+------------------------+-----------+----------------+----------------------+----------------+--------------+-----------+----------+------------------+-------------------+--------------------+------

In [None]:
mean_value = df.select(sf.mean(df['total_children'])).collect()[0][0]

df = df.na.fill({'total_children': mean_value})


## Why we can't call directly on columns

In [None]:
sf.mean(df['total_children']).collect()

# This error arises because sf.mean(df['total_children']) returns a Column object,
# \which represents a column expression in a DataFrame. You're trying to call collect() directly on this Column object,
# but collect() is a method that can be called only on a DataFrame, not on a Column object.

TypeError: 'Column' object is not callable

In [None]:
type(df.select(sf.mean(df['total_children'])))

pyspark.sql.dataframe.DataFrame

In [None]:
type(sf.mean(df['total_children']))

pyspark.sql.column.Column

# Let's try to find out mode

In [None]:
df.select('htl_city_code').groupBy('htl_city_code').count().orderBy('count', ascending=False).show()

+-------------+-----+
|htl_city_code|count|
+-------------+-----+
|          DEL| 5857|
|          DED| 4615|
|          DBG|  452|
|          DBI|  377|
|          DBV|  236|
|          DDM|  117|
|         DEAS|   28|
|         DBV1|   18|
|         DAVO|   11|
|  high_to_low|    3|
|         DCAR|    1|
+-------------+-----+



In [None]:
df.select(sf.mode(df['htl_city_code'])).show()

+-------------------+
|mode(htl_city_code)|
+-------------------+
|                DEL|
+-------------------+



## How to do groupby??

In [None]:
df.groupby("htl_city_code").agg(sf.mean("total_adults").alias('avg_adults_in_city')).show()

+-------------+------------------+
|htl_city_code|avg_adults_in_city|
+-------------+------------------+
|          DDM| 4.119658119658119|
|          DBG|1.8141592920353982|
|         DEAS|               2.0|
|          DBV|1.3008474576271187|
|         DBV1|               1.0|
|          DED| 2.235969664138678|
|  high_to_low|              NULL|
|         DAVO| 4.636363636363637|
|          DEL| 2.097660918558989|
|         DCAR|               2.0|
|          DBI|2.1989389920424403|
+-------------+------------------+



In [None]:
df.select('htl_city_code', 'country_code').show()

# select using sql

# df.createOrReplaceTempView("click_stream")
# spark.sql("select cpn_status, cpn_id from click_stream").show()

+-------------+------------+
|htl_city_code|country_code|
+-------------+------------+
|         DAVO|        NULL|
|          DEL|        NULL|
|         DAVO|        NULL|
|          DEL|        NULL|
|         DAVO|        NULL|
|          DEL|        NULL|
|         DAVO|        NULL|
|          DEL|        NULL|
|         DAVO|        NULL|
|          DEL|        NULL|
|         DAVO|        NULL|
|          DEL|        NULL|
|         DAVO|        NULL|
|          DEL|        NULL|
|         DAVO|        NULL|
|          DEL|        NULL|
|         DAVO|        NULL|
|          DEL|        NULL|
|         DAVO|        NULL|
|          DEL|        NULL|
+-------------+------------+
only showing top 20 rows



## How to Filter?

In [None]:
df.filter(df.total_adults.isin([2, 5, 8, 10])).show()

+------------------+-----------+------------+--------------------+--------------------+--------------------+---------+------------------+------------------+-------------------+---------------+--------+--------------------+-----------+----------+-------+----------+-----------+----------+-------------+-------------------------+--------------+----------+--------------------+------------+-------------+------------+--------------+------------+-----------------+-----------+-----+-------------+--------------+-------------+-----------------+------------+------------------+------+-----------------+----------------+----------------+---------+---------+---------+-----------------+-------------------+--------------+-------------+--------------+-----------------------+------------------------+-----------+----------------+----------------------+----------------+--------------+-----------+----------+------------------+-------------------+--------------------+---------------+-----------+-----------+--

In [None]:
df.filter(df.total_adults.isin([2, 5, 8, 10])).show()

+------------------+-----------+------------+--------------------+--------------------+--------------------+---------+------------------+------------------+-------------------+---------------+--------+--------------------+-----------+----------+-------+----------+-----------+----------+-------------+-------------------------+--------------+----------+--------------------+------------+-------------+------------+--------------+------------+-----------------+-----------+-----+-------------+--------------+-------------+-----------------+------------+------------------+------+-----------------+----------------+----------------+---------+---------+---------+-----------------+-------------------+--------------+-------------+--------------+-----------------------+------------------------+-----------+----------------+----------------------+----------------+--------------+-----------+----------+------------------+-------------------+--------------------+---------------+-----------+-----------+--

In [None]:
# Sample transformations
df.filter(df['total_adults'] == 2).groupBy('total_children').agg({'total_room_nights': 'mean'}).withColumnRenamed('avg(total_room_nights)', 'room_per_night').show()

+-------------------+-----------------+
|     total_children|   room_per_night|
+-------------------+-----------------+
|                0.0|1.634054834054834|
|0.11966548840417278|1.018181818181818|
|                1.0|1.951417004048583|
|                4.0|              1.0|
|                3.0|              2.0|
|                2.0|        1.7109375|
+-------------------+-----------------+



In [None]:
df = df.withColumn('couple', sf.when(df.total_adults > 2, 'not-couple').otherwise('couple'))
df.show()

+------------------+-----------+------------+--------------------+--------------------+--------------------+---------+------------------+------------------+-------------------+---------------+--------+--------------------+-----------+----------------+-------+----------+-----------+----------+-------------+-------------------------+--------------+----------+--------------------+------------+-------------+------------+--------------+------------+-----------------+-----------+-----+-------------+--------------+-------------+-----------------+------------+------------------+------+-----------------+----------------+----------------+---------+---------+---------+-----------------+-------------------+--------------+-------------+--------------+-----------------------+------------------------+-----------+----------------+----------------------+----------------+--------------+-----------+----------+------------------+-------------------+--------------------+---------------+-----------+--------

In [None]:
df.select('couple').groupBy('couple').count().show()

+----------+-----+
|    couple|count|
+----------+-----+
|    couple| 9982|
|not-couple| 1733|
+----------+-----+



In [None]:
df.withColumnRenamed('couple', '2-adults').show()

+------------------+-----------+------------+--------------------+--------------------+--------------------+---------+------------------+------------------+-------------------+---------------+--------+--------------------+-----------+----------------+-------+----------+-----------+----------+-------------+-------------------------+--------------+----------+--------------------+------------+-------------+------------+--------------+------------+-----------------+-----------+-----+-------------+--------------+-------------+-----------------+------------+------------------+------+-----------------+----------------+----------------+---------+---------+---------+-----------------+-------------------+--------------+-------------+--------------+-----------------------+------------------------+-----------+----------------+----------------------+----------------+--------------+-----------+----------+------------------+-------------------+--------------------+---------------+-----------+--------

# Let's do EDA

In [None]:
# Assuming 'checkin_date' and 'checkout_date' are in the format 'yyyy-MM-dd'
df = df.withColumn("checkin_date", sf.to_date(df.checkin_date, 'yyyy-MM-dd'))
df = df.withColumn("checkout_date", sf.to_date(df.checkout_date, 'yyyy-MM-dd'))

# Calculating the length of stay
df = df.withColumn("length_of_stay", sf.datediff(df.checkout_date, df.checkin_date))


In [None]:
df.select('length_of_stay').show()

+--------------+
|length_of_stay|
+--------------+
|             2|
|             4|
|             2|
|             3|
|             1|
|             1|
|             2|
|             1|
|             1|
|             2|
|             2|
|             1|
|             2|
|             5|
|             1|
|             1|
|             2|
|             1|
|             1|
|             1|
+--------------+
only showing top 20 rows



## Let's see how to perform Label Encoding and OneHotEncoding

In [None]:
from pyspark.ml.feature import StringIndexer

# StringIndexer to convert the categorical text data into indices
indexer = StringIndexer(inputCol="device_type", outputCol="device_type_index")

# Applying the indexer to the DataFrame
df_indexed = indexer.fit(df).transform(df)

# Show the result
df_indexed.select("device_type", "device_type_index").show(100)



+-----------+-----------------+
|device_type|device_type_index|
+-----------+-----------------+
|     mobile|              0.0|
|     mobile|              0.0|
|     mobile|              0.0|
|     mobile|              0.0|
|     mobile|              0.0|
|     mobile|              0.0|
|     mobile|              0.0|
|     mobile|              0.0|
|     mobile|              0.0|
|     mobile|              0.0|
|     mobile|              0.0|
|     mobile|              0.0|
|     mobile|              0.0|
|     mobile|              0.0|
|     mobile|              0.0|
|     mobile|              0.0|
|     mobile|              0.0|
|     mobile|              0.0|
|     mobile|              0.0|
|     mobile|              0.0|
|     mobile|              0.0|
|     mobile|              0.0|
|     mobile|              0.0|
|     mobile|              0.0|
|     mobile|              0.0|
|     mobile|              0.0|
|     mobile|              0.0|
|     mobile|              0.0|
|     mo

In [None]:
df_indexed.filter(df_indexed.device_type != 'mobile').select('device_type', 'device_type_index').show()


+-----------+-----------------+
|device_type|device_type_index|
+-----------+-----------------+
|        pwa|              2.0|
|        pwa|              2.0|
|    desktop|              1.0|
|        pwa|              2.0|
|        pwa|              2.0|
|        pwa|              2.0|
|    desktop|              1.0|
|        pwa|              2.0|
|        pwa|              2.0|
|    desktop|              1.0|
|    desktop|              1.0|
|        pwa|              2.0|
|        pwa|              2.0|
|    desktop|              1.0|
|        pwa|              2.0|
|    desktop|              1.0|
|    desktop|              1.0|
|    desktop|              1.0|
|    desktop|              1.0|
|        pwa|              2.0|
+-----------+-----------------+
only showing top 20 rows



In [None]:
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml import Pipeline

# Create a OneHotEncoder
encoder = OneHotEncoder(inputCols=["device_type_index"], outputCols=["device_type_encoded"])

# Combine StringIndexer and OneHotEncoder in a Pipeline
pipeline = Pipeline(stages=[indexer, encoder])

# Fit and transform the Pipeline to the DataFrame
df_encoded = pipeline.fit(df).transform(df)

# Show the results
df_encoded.select("device_type", "device_type_index", "device_type_encoded").show()


+-----------+-----------------+-------------------+
|device_type|device_type_index|device_type_encoded|
+-----------+-----------------+-------------------+
|     mobile|              0.0|      (2,[0],[1.0])|
|     mobile|              0.0|      (2,[0],[1.0])|
|     mobile|              0.0|      (2,[0],[1.0])|
|     mobile|              0.0|      (2,[0],[1.0])|
|     mobile|              0.0|      (2,[0],[1.0])|
|     mobile|              0.0|      (2,[0],[1.0])|
|     mobile|              0.0|      (2,[0],[1.0])|
|     mobile|              0.0|      (2,[0],[1.0])|
|     mobile|              0.0|      (2,[0],[1.0])|
|     mobile|              0.0|      (2,[0],[1.0])|
|     mobile|              0.0|      (2,[0],[1.0])|
|     mobile|              0.0|      (2,[0],[1.0])|
|     mobile|              0.0|      (2,[0],[1.0])|
|     mobile|              0.0|      (2,[0],[1.0])|
|     mobile|              0.0|      (2,[0],[1.0])|
|     mobile|              0.0|      (2,[0],[1.0])|
|     mobile

In [None]:
df_encoded.filter(df_encoded.device_type != 'mobile').select('device_type', 'device_type_index', "device_type_encoded").show()


+-----------+-----------------+-------------------+
|device_type|device_type_index|device_type_encoded|
+-----------+-----------------+-------------------+
|        pwa|              2.0|          (2,[],[])|
|        pwa|              2.0|          (2,[],[])|
|    desktop|              1.0|      (2,[1],[1.0])|
|        pwa|              2.0|          (2,[],[])|
|        pwa|              2.0|          (2,[],[])|
|        pwa|              2.0|          (2,[],[])|
|    desktop|              1.0|      (2,[1],[1.0])|
|        pwa|              2.0|          (2,[],[])|
|        pwa|              2.0|          (2,[],[])|
|    desktop|              1.0|      (2,[1],[1.0])|
|    desktop|              1.0|      (2,[1],[1.0])|
|        pwa|              2.0|          (2,[],[])|
|        pwa|              2.0|          (2,[],[])|
|    desktop|              1.0|      (2,[1],[1.0])|
|        pwa|              2.0|          (2,[],[])|
|    desktop|              1.0|      (2,[1],[1.0])|
|    desktop

In [None]:
df_encoded.select('device_type').groupBy('device_type').count().show()

+-----------+-----+
|device_type|count|
+-----------+-----+
|    desktop|   97|
|        pwa|   90|
|     mobile|11528|
+-----------+-----+



In [None]:
df.dropna().count()

0

In [None]:
# udf
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, IntegerType


# in pyspark we've something called udf
def custom_python_function(value):
  if value == 'not-couple':
    return 0
  else:
    return 1
  # return value * 2

udf_custom_function =udf(custom_python_function, StringType())


df.withColumn('mod_total_adults', udf_custom_function(sf.col('couple'))).show()

In [None]:
# primary stuff in module test
1. drift
2. experiment tracking
3. github
4. docker