## Setting up Kaggle and downloading data

In [None]:
!rm -rf
!mkdir data

In [None]:
from google.colab import files

uploaded = files.upload()

for fn in uploaded.keys():
  print('User uploaded file "{name}" with length {length} bytes'.format(
      name=fn, length=len(uploaded[fn])))

Saving kaggle.json to kaggle.json
User uploaded file "kaggle.json" with length 67 bytes


In [None]:
%pip install kaggle

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
%%bash
mkdir -p ~/.kaggle
mv kaggle.json ~/.kaggle
chmod 600 /root/.kaggle/kaggle.json
kaggle datasets download ivanchvez/99littleorange --path data/
unzip -q data/99littleorange.zip -d data/

Downloading 99littleorange.zip to data



  0%|          | 0.00/87.7M [00:00<?, ?B/s]  6%|▌         | 5.00M/87.7M [00:00<00:02, 38.3MB/s] 23%|██▎       | 20.0M/87.7M [00:00<00:00, 97.7MB/s] 35%|███▌      | 31.0M/87.7M [00:00<00:00, 67.2MB/s] 56%|█████▌    | 49.0M/87.7M [00:00<00:00, 99.8MB/s] 70%|██████▉   | 61.0M/87.7M [00:01<00:00, 54.3MB/s] 93%|█████████▎| 82.0M/87.7M [00:01<00:00, 82.0MB/s]100%|██████████| 87.7M/87.7M [00:01<00:00, 80.3MB/s]


## Pyspark setup

In [None]:
# innstall java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget https://dlcdn.apache.org/spark/spark-3.3.0/spark-3.3.0-bin-hadoop3.tgz

# unzip the spark file to the current folder
!tar xf spark-3.3.0-bin-hadoop3.tgz

# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.0-bin-hadoop3"

# install findspark and pyspark using pip
%pip install -q findspark pyspark

--2022-08-24 18:20:44--  https://dlcdn.apache.org/spark/spark-3.3.0/spark-3.3.0-bin-hadoop3.tgz
Resolving dlcdn.apache.org (dlcdn.apache.org)... 151.101.2.132, 2a04:4e42::644
Connecting to dlcdn.apache.org (dlcdn.apache.org)|151.101.2.132|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 299321244 (285M) [application/x-gzip]
Saving to: ‘spark-3.3.0-bin-hadoop3.tgz’


2022-08-24 18:20:45 (197 MB/s) - ‘spark-3.3.0-bin-hadoop3.tgz’ saved [299321244/299321244]

[K     |████████████████████████████████| 281.3 MB 50 kB/s 
[K     |████████████████████████████████| 199 kB 47.8 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [None]:
# We need to locate Spark in the system. For that, we import findspark and use the findspark.init() method.

import findspark
findspark.init()
# Bonus – If you want to know the location where Spark is installed, use findspark.find()
findspark.find()

'/content/spark-3.3.0-bin-hadoop3'

In [None]:
# Now, we can import SparkSession from pyspark.sql and create a SparkSession, 
# which is the entry point to Spark.

# You can give a name to the session using appName() and add some configurations with config() if you wish.

import pyspark
import pyspark.sql.functions as F
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()
# Finally, print the SparkSession variable.
spark

## 1.1 Data Preparation and Loading (5%)

In [None]:
trip_df = spark.read.csv("data/trip.csv", header=True)
trip_df.printSchema()
trip_df.show(10)
trip_df.count(), len(trip_df.columns)

root
 |-- id: string (nullable = true)
 |-- driver_id: string (nullable = true)
 |-- passenger_id: string (nullable = true)
 |-- city_id: string (nullable = true)
 |-- call_time: string (nullable = true)
 |-- finish_time: string (nullable = true)
 |-- surge_rate: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- trip_fare: string (nullable = true)

+--------+---------+------------+-------+--------------------+--------------------+----------+-------------+---------+
|      id|driver_id|passenger_id|city_id|           call_time|         finish_time|surge_rate|trip_distance|trip_fare|
+--------+---------+------------+-------+--------------------+--------------------+----------+-------------+---------+
|a84c0db2| cb64172e|    2ea4359d|  43006|9/8/2019 10:01:28 PM|9/8/2019 10:15:54 PM|       0.3|         1.92|     4.36|
|a84c2d10| 3034e6fe|    e1bd0c8d|  43006| 7/1/2019 8:56:28 PM| 7/1/2019 9:22:09 PM|         0|         9.36|     6.91|
|a84c3e75| 93fb6802|    75e6d

(2318357, 9)

In [None]:
passenger_df = spark.read.csv("data/passenger.csv", header=True)
passenger_df.printSchema()
passenger_df.show(5)
passenger_df.count(), len(passenger_df.columns)

root
 |-- id: string (nullable = true)
 |-- first_call_time: string (nullable = true)

+--------+--------------------+
|      id|     first_call_time|
+--------+--------------------+
|00000fab|5/11/2019 8:23:54 PM|
|0000149d|12/24/2018 9:21:3...|
|00002b66|9/20/2018 9:08:24 PM|
|00002bc7|7/28/2019 11:27:5...|
|0000332e|2/21/2019 11:34:2...|
+--------+--------------------+
only showing top 5 rows



(1235782, 2)

## 1.2 Data Partitioning in RDD (10%)

In [None]:
passenger_df.rdd.getNumPartitions()

1

In [None]:
trip_df.rdd.getNumPartitions()

2

By default, Spark/PySpark creates partitions that are equal to the number of CPU cores in the machine.

https://sparkbyexamples.com/spark/spark-partitioning-understanding/

In [None]:
!lscpu

Architecture:        x86_64
CPU op-mode(s):      32-bit, 64-bit
Byte Order:          Little Endian
CPU(s):              2
On-line CPU(s) list: 0,1
Thread(s) per core:  2
Core(s) per socket:  1
Socket(s):           1
NUMA node(s):        1
Vendor ID:           GenuineIntel
CPU family:          6
Model:               79
Model name:          Intel(R) Xeon(R) CPU @ 2.20GHz
Stepping:            0
CPU MHz:             2199.998
BogoMIPS:            4399.99
Hypervisor vendor:   KVM
Virtualization type: full
L1d cache:           32K
L1i cache:           32K
L2 cache:            256K
L3 cache:            56320K
NUMA node0 CPU(s):   0,1
Flags:               fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr sse sse2 ss ht syscall nx pdpe1gb rdtscp lm constant_tsc rep_good nopl xtopology nonstop_tsc cpuid tsc_known_freq pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt aes xsave avx f16c rdrand hypervisor lahf_lm abm 3dnowprefetch invpcid_sin

In [None]:
from pyspark.sql.functions import col
passenger_df.filter(col("first_call_time").contains("2019")).show(5)

+--------+--------------------+
|      id|     first_call_time|
+--------+--------------------+
|00000fab|5/11/2019 8:23:54 PM|
|00002bc7|7/28/2019 11:27:5...|
|0000332e|2/21/2019 11:34:2...|
|000048aa| 8/2/2019 8:01:07 AM|
|0000497f|7/11/2019 3:39:47 AM|
+--------+--------------------+
only showing top 5 rows



In [None]:
pdf_2019 = passenger_df.withColumn(
    "flag", 
     F.when(F.col("first_call_time").contains("2019"), F.lit("In 2019")).otherwise(
        F.lit("Not In 2019"))
 )

In [None]:
pdf2019rdd = pdf_2019.rdd

In [None]:
pairrdd = pdf2019rdd.map(lambda x: {x.flag:x.id})

In [None]:
pairrdd.collect()[:5]

[{'In 2019': '00000fab'},
 {'Not In 2019': '0000149d'},
 {'Not In 2019': '00002b66'},
 {'In 2019': '00002bc7'},
 {'In 2019': '0000332e'}]

In [None]:
type(pairrdd)

pyspark.rdd.PipelinedRDD

In [None]:
pdf_2019.write.option("header", True) \
        .partitionBy('flag') \
        .mode("overwrite") \
        .csv("2019")

In [None]:
!ls 2019

'flag=In 2019'	'flag=Not In 2019'   _SUCCESS


In [None]:
!ls 2019/flag=In\ 2019

part-00000-d8de2102-8728-4cb9-8665-9dfd82bcfce6.c000.csv


In [None]:
from pyspark.sql.functions  import spark_partition_id
pdf_2019.withColumn("flag", spark_partition_id()).groupBy("flag").count().show()

+----+-------+
|flag|  count|
+----+-------+
|   0|1235782|
+----+-------+



## 1.3 Query/Analysis (15%)

In [None]:
trip_df.show()

+--------+---------+------------+-------+--------------------+--------------------+----------+-------------+---------+
|      id|driver_id|passenger_id|city_id|           call_time|         finish_time|surge_rate|trip_distance|trip_fare|
+--------+---------+------------+-------+--------------------+--------------------+----------+-------------+---------+
|a84c0db2| cb64172e|    2ea4359d|  43006|9/8/2019 10:01:28 PM|9/8/2019 10:15:54 PM|       0.3|         1.92|     4.36|
|a84c2d10| 3034e6fe|    e1bd0c8d|  43006| 7/1/2019 8:56:28 PM| 7/1/2019 9:22:09 PM|         0|         9.36|     6.91|
|a84c3e75| 93fb6802|    75e6d450|  43050|8/20/2019 9:30:46 PM|8/20/2019 9:46:48 PM|         0|         4.39|     3.48|
|a84c3f6f| 25fc9273|    5ba48e89|  43020| 9/8/2019 2:42:41 AM| 9/8/2019 2:59:56 AM|         0|         2.04|     1.54|
|a84c5ce2| 484c4d0c|    fdb2ef59|  43006|6/10/2019 9:29:18 AM|6/10/2019 9:55:36 AM|         0|         7.37|     5.83|
|a84c7980| 4ae02c4b|    f9cbc467|  43179|8/26/20

In [None]:
#Count of non-null for all columns
from pyspark.sql.functions import col, when, count
trip_df.select([count(when(col(c).isNotNull() , c)).alias(c) for c in trip_df.columns]
   ).show()

+-------+---------+------------+-------+---------+-----------+----------+-------------+---------+
|     id|driver_id|passenger_id|city_id|call_time|finish_time|surge_rate|trip_distance|trip_fare|
+-------+---------+------------+-------+---------+-----------+----------+-------------+---------+
|2318357|  2318357|     2318357|2318357|  2318357|    2318357|   2318351|      2295118|  2318357|
+-------+---------+------------+-------+---------+-----------+----------+-------------+---------+



In [None]:
#Count of null for all columns
from pyspark.sql.functions import col, when, count
trip_df.select([count(when(col(c).isNull() , c)).alias(c) for c in trip_df.columns]
   ).show()

+---+---------+------------+-------+---------+-----------+----------+-------------+---------+
| id|driver_id|passenger_id|city_id|call_time|finish_time|surge_rate|trip_distance|trip_fare|
+---+---------+------------+-------+---------+-----------+----------+-------------+---------+
|  0|        0|           0|      0|        0|          0|         6|        23239|        0|
+---+---------+------------+-------+---------+-----------+----------+-------------+---------+



https://sparkbyexamples.com/pyspark/pyspark-count-of-non-null-nan-values-in-dataframe/

In [None]:
import numpy as np
from pyspark.sql.functions import isnan
print(trip_df.filter(~trip_df.trip_distance.contains('None') & \
                ~trip_df.trip_distance.contains('NULL') & \
                ~isnan(trip_df.trip_distance) & \
                ~trip_df.trip_distance.isNull()
            ).count())

2295118


In [None]:
import numpy as np
from pyspark.sql.functions import isnan
trip_df_filtered = trip_df.filter(~trip_df.trip_distance.contains('None') & \
                ~trip_df.trip_distance.contains('NULL') & \
                ~isnan(trip_df.trip_distance) & \
                ~trip_df.trip_distance.isNull()
            )

In [None]:
trip_df_filtered.show(truncate=False)

+--------+---------+------------+-------+---------------------+---------------------+----------+-------------+---------+
|id      |driver_id|passenger_id|city_id|call_time            |finish_time          |surge_rate|trip_distance|trip_fare|
+--------+---------+------------+-------+---------------------+---------------------+----------+-------------+---------+
|a84c0db2|cb64172e |2ea4359d    |43006  |9/8/2019 10:01:28 PM |9/8/2019 10:15:54 PM |0.3       |1.92         |4.36     |
|a84c2d10|3034e6fe |e1bd0c8d    |43006  |7/1/2019 8:56:28 PM  |7/1/2019 9:22:09 PM  |0         |9.36         |6.91     |
|a84c3e75|93fb6802 |75e6d450    |43050  |8/20/2019 9:30:46 PM |8/20/2019 9:46:48 PM |0         |4.39         |3.48     |
|a84c3f6f|25fc9273 |5ba48e89    |43020  |9/8/2019 2:42:41 AM  |9/8/2019 2:59:56 AM  |0         |2.04         |1.54     |
|a84c5ce2|484c4d0c |fdb2ef59    |43006  |6/10/2019 9:29:18 AM |6/10/2019 9:55:36 AM |0         |7.37         |5.83     |
|a84c7980|4ae02c4b |f9cbc467    

https://sparkbyexamples.com/pyspark/pyspark-groupby-explained-with-example/

https://sparkbyexamples.com/pyspark/pyspark-convert-string-type-to-double-type-float-type/

In [None]:
trip_df_filtered = trip_df_filtered.withColumn("surge_rate",trip_df_filtered.surge_rate.cast('double'))
trip_df_filtered.groupBy("city_id").avg("surge_rate").show(truncate=False)

In [None]:
import pyspark.sql.functions as func
trip_df_filtered = trip_df_filtered.withColumn("trip_distance",trip_df_filtered.trip_distance.cast('double'))

# showing negative trips
trip_df_filtered.filter(func.col("trip_distance") <= 0).show()

In [None]:
# count of negative trips
trip_df_filtered.filter(func.col("trip_distance") <= 0).count()

15257

In [None]:
# count of positive trips
trip_df_filtered.filter(func.col("trip_distance") >= 0).count()

2280249

In [None]:
trip_df_filtered = trip_df_filtered.filter(func.col("trip_distance") >= 0)
trip_df_filtered.count()

2280249

In [None]:
from pyspark.sql import functions as F

(trip_df_filtered
    .agg(
        F.avg(F.col('trip_distance')).alias('avg_trip'),
        F.min(F.col('trip_distance')).alias('min_trip_distance'),
        F.max(F.col('trip_distance')).alias('max_trip_distance'),
    )
    .show()
)

+-----------------+-----------------+-----------------+
|         avg_trip|min_trip_distance|max_trip_distance|
+-----------------+-----------------+-----------------+
|6.329764196804844|              0.0|            296.1|
+-----------------+-----------------+-----------------+



https://sparkbyexamples.com/pyspark/pyspark-where-filter/

In [None]:
# driver_id with min trip (non-negative)

columns = ["driver_id", "trip_distance"]
trip_df_filtered.filter(trip_df_filtered.trip_distance == 0.0).select(*columns).show(truncate=False)

+---------+-------------+
|driver_id|trip_distance|
+---------+-------------+
|9019790c |0.0          |
|5ffefeda |0.0          |
|e3d08377 |0.0          |
|d78ff215 |0.0          |
|e69ebba3 |0.0          |
|1483947c |0.0          |
|75d35e57 |0.0          |
|fc30d010 |0.0          |
|afba9024 |0.0          |
|5ff7632a |0.0          |
|a04d8ad6 |0.0          |
|c82c8d69 |0.0          |
|c49f6816 |0.0          |
|60f8ef4a |0.0          |
|50f07fc7 |0.0          |
|f78d2815 |0.0          |
|c805e5eb |0.0          |
|c2cfd970 |0.0          |
|a4d7a91a |0.0          |
|ef2cf645 |0.0          |
+---------+-------------+
only showing top 20 rows



In [None]:
# All other trips by driver with min trip_distance 0, driver_id = 9019790c
trip_df_filtered.filter( (trip_df_filtered.trip_distance  != 0.0) & (trip_df_filtered.driver_id  == "9019790c") ) \
    .show(truncate=False)  

+--------+---------+------------+-------+---------------------+---------------------+----------+-------------+---------+
|id      |driver_id|passenger_id|city_id|call_time            |finish_time          |surge_rate|trip_distance|trip_fare|
+--------+---------+------------+-------+---------------------+---------------------+----------+-------------+---------+
|b4f7f018|9019790c |7683ae1c    |43006  |7/3/2019 6:54:10 PM  |7/3/2019 7:11:59 PM  |-0.2      |3.73         |3.16     |
|4d9129be|9019790c |3cd84a02    |43006  |8/11/2019 12:18:15 AM|8/11/2019 12:56:52 AM|-0.2      |15.39        |27.38    |
+--------+---------+------------+-------+---------------------+---------------------+----------+-------------+---------+



In [None]:
# driver_id with max trip (non-negative)

trip_df_filtered.filter(trip_df_filtered.trip_distance == 296.1).select(*columns).show(truncate=False)

+---------+-------------+
|driver_id|trip_distance|
+---------+-------------+
|704642b5 |296.1        |
+---------+-------------+



In [None]:
# All other trips by driver with max trip_distance 296.1, driver_id = 704642b5
trip_df_filtered.filter( (trip_df_filtered.trip_distance  != 296.1) & (trip_df_filtered.driver_id  == "704642b5") ) \
    .show(truncate=False)  

+--------+---------+------------+-------+---------------------+---------------------+----------+-------------+---------+
|id      |driver_id|passenger_id|city_id|call_time            |finish_time          |surge_rate|trip_distance|trip_fare|
+--------+---------+------------+-------+---------------------+---------------------+----------+-------------+---------+
|c7d3b537|704642b5 |7911406f    |43116  |8/3/2019 8:05:49 AM  |8/3/2019 8:25:20 AM  |0.0       |3.21         |1.66     |
|1240b566|704642b5 |106e0be6    |43116  |7/5/2019 6:20:55 PM  |7/5/2019 6:50:06 PM  |0.0       |11.09        |4.55     |
|5189f92a|704642b5 |a1f12605    |43116  |7/18/2019 11:56:14 PM|7/19/2019 12:05:16 AM|0.0       |2.61         |1.2      |
|75f5bbc7|704642b5 |5c12690c    |43116  |7/3/2019 2:58:20 AM  |7/3/2019 3:25:53 AM  |0.0       |11.7         |4.01     |
|e22d78b7|704642b5 |7d81049e    |43116  |5/15/2019 10:40:45 PM|5/15/2019 10:54:24 PM|0.0       |4.65         |5.31     |
|3d98b2f8|704642b5 |ee6111f1    