# Problem Statement: We need to be able to import products from a CSV file and into a database. There are half a million product details to be imported into the database

In [1]:
import pyspark
from delta import *

In [2]:
# creating spark config object with cores and degree of parallelism
config = pyspark.SparkConf().setMaster("local[*]").setAll([('spark.driver.extraJavaOptions',
                                      "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED"), 
                                     ('spark.jars.packages',
                                      'org.postgresql:postgresql:42.2.10,io.delta:delta-core_2.12:1.1.0'),
                                     ("spark.sql.extensions", 
                                      "io.delta.sql.DeltaSparkSessionExtension"),
                                     ("spark.sql.catalog.spark_catalog", 
                                      "org.apache.spark.sql.delta.catalog.DeltaCatalog"),
                                     ('spark.executor.cores', 4),
                                     ("spark.default.parallelism", 4)])

spark = pyspark.sql.SparkSession.builder.appName('large_file_processor').config(conf=config).getOrCreate()


22/06/02 19:53:09 WARN Utils: Your hostname, Admins-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.2.5 instead (on interface en0)
22/06/02 19:53:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/Users/pranavwadekar/.virtualenvs/venv-personal/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/pranavwadekar/.ivy2/cache
The jars for the packages stored in: /Users/pranavwadekar/.ivy2/jars
org.postgresql#postgresql added as a dependency
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-5540d1ea-d1e2-40cb-a772-b39b1a1f1f66;1.0
	confs: [default]
	found org.postgresql#postgresql;42.2.10 in central
	found io.delta#delta-core_2.12;1.1.0 in central
	found org.antlr#antlr4-runtime;4.8 in central
	found org.codehaus.jackson#jackson-core-asl;1.9.13 in central
:: resolution report :: resolve 308ms :: artifacts dl 17ms
	:: modules in use:
	io.delta#delta-core_2.12;1.1.0 from central in [default]
	org.antlr#antlr4-runtime;4.8 from central in [default]
	org.codehaus.jackson#jackson-core-asl;1.9.13 from central in [default]
	org.postgresql#postgresql;42.2.10 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules    

In [3]:
spark.version

'3.2.1'

In [4]:
from pyspark.sql.functions import col

# importing large csv file, also with nonNull sku values.
df = spark.read.format("csv").options(
    path="/Users/pranavwadekar/Desktop/personal_projects/pyspark_learning/base_data/products.csv.gz",
    inferSchema=True, header=True).load().filter(col("sku").isNotNull())


                                                                                

In [5]:
df.show()

+--------------------+--------------------+--------------------+
|                name|                 sku|         description|
+--------------------+--------------------+--------------------+
|         Bryce Jones|pranav-lay-raise-...|Art community flo...|
|John Robinson upd...|    cup-return-guess|Produce successfu...|
|      Theresa Taylor|           step-onto|Choice should lea...|
|        Roger Huerta| citizen-some-middle|Important fight w...|
|        John Buckley|      term-important|Alone maybe educa...|
|     Tiffany Johnson|       do-many-avoid|     Born tree wind.|
|      Roy Golden DDS|     help-return-art|Pm daughter thous...|
|        David Wright| listen-enough-check|Under its near. N...|
|       Anthony Burch|    anyone-executive|I lose positive m...|
|        Lauren Smith|  grow-we-decide-job|Smile yet fear so...|
|          Bailey Cox|     suggest-similar|Peace happy lette...|
|       Jeffrey Davis|     million-quality|See sea guy fire ...|
|        Lisa Sanchez|   

In [6]:
df1 = df.repartition(10)

In [7]:
df1.rdd.getNumPartitions()

[Stage 3:>                                                          (0 + 1) / 1]

10

In [9]:
%%time

# writing records in postgres database with 4 parallel threads
df1.write \
  .format("jdbc") \
  .mode("Append") \
  .option("driver", "org.postgresql.Driver") \
  .option("url", "jdbc:postgresql://localhost:5432/postgres") \
  .option("dbtable", "sample.products") \
  .option("user", "sampleuser") \
  .option("password", "samplepass") \
  .save()

[Stage 6:>                                                        (0 + 10) / 10]

CPU times: user 3.9 ms, sys: 3.58 ms, total: 7.48 ms
Wall time: 6.94 s


                                                                                

# UPSERT LOGIC

In [8]:
# for upserting we need to first get all the source data

df2 = spark.read \
  .format("jdbc") \
  .option("driver", "org.postgresql.Driver") \
  .option("url", "jdbc:postgresql://localhost:5432/postgres") \
  .option("dbtable", "sample.products") \
  .option("user", "sampleuser") \
  .option("password", "samplepass") \
  .load()

In [9]:
df2.show()

+--------------------+-------------------+--------------------+
|                name|                sku|         description|
+--------------------+-------------------+--------------------+
|     Candace Francis|  a-adult-situation|Fine month cold a...|
|         Lance Baker|   a-agent-although|Speak ok executiv...|
|         Jason Smith|   a-agree-job-hour|Choose recognize ...|
|          Mark Payne|       a-among-beat|Alone drive feeli...|
|        Jose Johnson|    a-appear-ground|Offer remain wife...|
|      John Christian|    a-arrive-should|Another make gun ...|
|      Jessica Wagner|a-art-song-economic|Garden sound pres...|
|       Jeff Shepherd|a-assume-town-clear|Report right this...|
|        Louis Taylor| a-at-away-girl-tax|Late husband inst...|
|        Brent Willis|        a-authority|Animal law networ...|
|         Mark Miller|   a-available-like|Land yet reach ri...|
|       Carolyn Nixon|    a-available-per|Skin participant ...|
|           Juan Kerr|  a-avoid-game-wal

                                                                                

In [10]:
# convert the data into delta format because we need delta for ACID support

df2.write.format("delta").mode("OverWrite").option("overwriteSchema", "true").save("/Users/pranavwadekar/Desktop/personal_projects/pyspark_learning/base_data/delta-table")


                                                                                

In [11]:
# read from the delta table

from delta.tables import *

deltaTable = DeltaTable.forPath(spark, 
                                "/Users/pranavwadekar/Desktop/personal_projects/pyspark_learning/base_data/delta-table")


In [12]:
deltaTable.toDF().count()

500000

In [13]:
# Add and modify some SKU's write your own csv.

df4 = spark.read.format("csv").options(
      path="/Users/pranavwadekar/Desktop/personal_projects/pyspark_learning/base_data/updated_products.csv.gz",
      inferSchema=True, header=True).load().filter(col("sku").isNotNull())


In [14]:
df4.show()

[Stage 25:>                                                         (0 + 1) / 1]

+--------------------+--------------------+--------------------+
|                name|                 sku|         description|
+--------------------+--------------------+--------------------+
|         Bryce Jones|pranav-lay-raise-...|Art community flo...|
|John Robinson upd...|    cup-return-guess|Produce successfu...|
+--------------------+--------------------+--------------------+





In [15]:
# upsert is equal to merge in spark, so we need to specify the upsert keys (merge keys) which SKU in our case.
# when matched it means the data is updated so update all the columns if not then it is a new record, so insert the record

deltaTable.alias("dest").merge(
    source = df4.alias("source"),
    condition = "dest.sku = source.sku"
  ).whenMatchedUpdate(set =
    {
      "sku": "source.sku",
      "name": "source.name",
      "description": "source.description"
    }
  ).whenNotMatchedInsertAll() \
   .execute()



                                                                                

In [27]:
# finally we will write it into database

deltaTable.toDF().write \
  .format("jdbc") \
  .mode("OverWrite") \
  .option("driver", "org.postgresql.Driver") \
  .option("url", "jdbc:postgresql://localhost:5432/postgres") \
  .option("dbtable", "sample.products") \
  .option("user", "sampleuser") \
  .option("password", "samplepass") \
  .save()

                                                                                

# Learnings

In [3]:
config = pyspark.SparkConf().setAll([('spark.driver.extraJavaOptions',
                                      "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED"), 
                                     ('spark.jars.packages',
                                      'org.postgresql:postgresql:42.2.10')])

spark = pyspark.sql.SparkSession.builder.appName('test').config(conf=config).getOrCreate()


22/05/29 15:39:21 WARN Utils: Your hostname, Admins-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.102 instead (on interface en0)
22/05/29 15:39:21 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/Users/pranavwadekar/.virtualenvs/venv-personal/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/pranavwadekar/.ivy2/cache
The jars for the packages stored in: /Users/pranavwadekar/.ivy2/jars
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-91610a2e-5a43-4cae-b13d-8c0f7ff0b67f;1.0
	confs: [default]
	found org.postgresql#postgresql;42.2.10 in central
:: resolution report :: resolve 143ms :: artifacts dl 4ms
	:: modules in use:
	org.postgresql#postgresql;42.2.10 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-91610a2e-5a43-4cae-b13d-8c0f

In [4]:
spark.conf.get('spark.driver.extraJavaOptions')

'--add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED'

In [18]:
df = spark.read.format("csv").options(
    path="/Users/pranavwadekar/Desktop/personal_projects/pyspark_learning/base_data/survey.csv",
    inferSchema=True, header=True).load()


                                                                                

In [19]:
df.show()

+--------------------+-------------------+--------------------+
|                name|                sku|         description|
+--------------------+-------------------+--------------------+
|         Bryce Jones| lay-raise-best-end|Art community flo...|
|       John Robinson|   cup-return-guess|Produce successfu...|
|      Theresa Taylor|          step-onto|Choice should lea...|
|Often stuff profe...|               null|                null|
|        Roger Huerta|citizen-some-middle|Important fight w...|
|        John Buckley|     term-important|Alone maybe educa...|
|Want benefit mana...|               null|                null|
|     Tiffany Johnson|      do-many-avoid|     Born tree wind.|
|Boy marriage begi...|               null|                null|
|Certain throw exe...|               null|                null|
|      Roy Golden DDS|    help-return-art|Pm daughter thous...|
|Process eat emplo...|               null|                null|
|Increase author w...|               nul

In [7]:
# df1 = df.repartition(7, 'Country')#.

In [8]:
# df1.rdd.getNumPartitions()

In [9]:
df1 = df.select('Gender', 'treatment')

In [10]:
df1.show()

+------+---------+
|Gender|treatment|
+------+---------+
|Female|      Yes|
|     M|       No|
|  Male|       No|
|  Male|      Yes|
|  Male|       No|
|  Male|       No|
|Female|      Yes|
|     M|       No|
|Female|      Yes|
|  Male|       No|
|  Male|      Yes|
|  male|       No|
|female|      Yes|
|  Male|       No|
|  Male|       No|
|female|      Yes|
|  Male|      Yes|
|  Male|      Yes|
|  male|       No|
|  Male|       No|
+------+---------+
only showing top 20 rows



In [11]:
from pyspark.sql.functions import when

# applying transformations

df2 = df1.select('Gender', 
                 (when(df1.treatment=='Yes', 1).otherwise(0).alias('All-Yes')),
                 (when(df1.treatment=='No', 1).otherwise(0).alias('All-No')))

In [12]:
df2.show()

+------+-------+------+
|Gender|All-Yes|All-No|
+------+-------+------+
|Female|      1|     0|
|     M|      0|     1|
|  Male|      0|     1|
|  Male|      1|     0|
|  Male|      0|     1|
|  Male|      0|     1|
|Female|      1|     0|
|     M|      0|     1|
|Female|      1|     0|
|  Male|      0|     1|
|  Male|      1|     0|
|  male|      0|     1|
|female|      1|     0|
|  Male|      0|     1|
|  Male|      0|     1|
|female|      1|     0|
|  Male|      1|     0|
|  Male|      1|     0|
|  male|      0|     1|
|  Male|      0|     1|
+------+-------+------+
only showing top 20 rows



In [15]:
from pyspark.sql.functions import sum, count

# df3 = df2.groupBy('Gender').agg(sum('All-Yes'), sum('All-No'))

In [14]:
# writing sql over dataframe, first need to create a view

df.createOrReplaceTempView('someTemp')

In [15]:
dfout = spark.sql(
  """select gender, sum(yes) sum_yes, sum(no) sum_no 
                            from (select case when lower(trim(gender)) in ('male','m','male-ish','maile','mal','male (cis)',
                                                                           'make','male ','man','msle','mail','malr','cis man',
                                                                           'cis male') then 'Male' 
                                              when lower(trim(gender)) in ('cis female','f','female','woman','femake','female ',
                                                                           'cis-female/femme','female (cis)','femail') then 'Female'
                                              else 'Transgender' 
                                         end as gender,
                                         case when treatment == 'Yes' then 1 else 0 end as yes,
                                         case when treatment == 'No' then 1 else 0 end as no
                                   from someTemp) 
                                   where gender != 'Transgender'
                                   group by gender""")

In [16]:
dfout.show()

+------+-------+------+
|gender|sum_yes|sum_no|
+------+-------+------+
|Female|    170|    77|
|  Male|    450|   541|
+------+-------+------+



In [17]:
# writing results to dataframe

dfout.write \
  .format("jdbc") \
  .mode("overwrite") \
  .option("driver", "org.postgresql.Driver") \
  .option("url", "jdbc:postgresql://localhost:5432/postgres") \
  .option("dbtable", "sample.surveys_aggr") \
  .option("user", "sampleuser") \
  .option("password", "samplepass") \
  .save()

# Writing files in different format

In [15]:
df2.write. \
    format("parquet"). \
    mode("overwrite"). \
    save("/Users/pranavwadekar/Desktop/personal_projects/pyspark_learning/base_data/parquet_data")

In [16]:
df = spark.read \
  .format("parquet") \
  .option("mode", "failfast") \
  .load("/Users/pranavwadekar/Desktop/personal_projects/pyspark_learning/base_data/parquet_data")

df.write \
  .format("json") \
  .mode("overwrite") \
  .save("/Users/pranavwadekar/Desktop/personal_projects/pyspark_learning/base_data/json_data")