In [2]:
#Install Pyspark
!pip install pyspark



In [3]:
#import spark libraries
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext
import os

In [4]:
# set Java home 
os.environ["JAVA_HOME"] = "C:\Program Files\Java\jdk-18.0.2.1"


In [5]:
# Initiate spark context
conf = SparkConf() \
    .setAppName("Example") \
    .setMaster("local") \
    .set("spark.driver.extraClassPath","C:/pyspark/*")

In [6]:
sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc)

In [7]:
spark


In [8]:
# read data from my local storage/machine
df=spark.read.options(delimiter=",", header=True).csv(r"C:\Users\Liltimz\Desktop\New folder\yellow_tripdata_2023-01.csv")
df.show() 


+---+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|_c0|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+---+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|  0|       2| 2023-01-01 00:32:10|  2023-01-01 00:40:36|            1.0|         0.97|       1.0|                 N|         161|         141|           2|        9.3|  1.0| 

In [9]:
#print schema of my data
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- VendorID: string (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- congestion_surcharge: string (nullable = true)
 |-- airport_fee: string (nullable = true)



In [10]:
#filter a column (airport_fee)
df.filter(df.airport_fee == "1").show(truncate=False)

+---+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|_c0|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+---+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
+---+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-

In [11]:
# show four columns into a dataframe
df1 = df[['VendorID', 'trip_distance', 'PULocationID', 'DOLocationID']]
df1.show()

+--------+-------------+------------+------------+
|VendorID|trip_distance|PULocationID|DOLocationID|
+--------+-------------+------------+------------+
|       2|         0.97|         161|         141|
|       2|          1.1|          43|         237|
|       2|         2.51|          48|         238|
|       1|          1.9|         138|           7|
|       2|         1.43|         107|          79|
|       2|         1.84|         161|         137|
|       2|         1.66|         239|         143|
|       2|         11.7|         142|         200|
|       2|         2.95|         164|         236|
|       2|         3.01|         141|         107|
|       2|          1.8|         234|          68|
|       1|          7.3|          79|         264|
|       2|         3.23|         164|         143|
|       2|        11.43|         138|          33|
|       2|         2.95|          33|          61|
|       2|         1.52|          79|         186|
|       2|         2.23|       

In [12]:

PULocationID = df.groupBy('PULocationID').count()
print(PULocationID.show())

+------------+------+
|PULocationID| count|
+------------+------+
|         125| 13977|
|           7|  1510|
|          51|   248|
|         124|   189|
|         169|    92|
|         205|   230|
|         234| 81221|
|         232|  3257|
|          54|    35|
|          15|    34|
|         155|   147|
|         132|160030|
|         154|    29|
|         200|    76|
|          11|    40|
|         101|    53|
|         138| 89188|
|          69|   173|
|          29|    76|
|          42|  2705|
+------------+------+
only showing top 20 rows

None


In [13]:
df.createOrReplaceTempView("uber_data")
output = spark.sql('SELECT * from uber_data limit 10')
output.show()

+---+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|_c0|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+---+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|  0|       2| 2023-01-01 00:32:10|  2023-01-01 00:40:36|            1.0|         0.97|       1.0|                 N|         161|         141|           2|        9.3|  1.0| 

In [14]:
df.createOrReplaceTempView("uber_data")
output = spark.sql("SELECT * from uber_data where VendorID='2'")
output.show()

+---+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|_c0|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+---+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|  0|       2| 2023-01-01 00:32:10|  2023-01-01 00:40:36|            1.0|         0.97|       1.0|                 N|         161|         141|           2|        9.3|  1.0| 

In [18]:
dest_tbl = 'public."spark_table"'
database = "postgres"
password = ""
user = "postgres"

In [21]:
df.write \
    .format("jdbc") \
    .option("url", f"jdbc:postgresql://localhost:5432/{database}") \
    .option("dbtable", dest_tbl) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "org.postgresql.Driver") \
    .mode("overwrite") \
    .save()
