<a id="top">

# Table of Contents

- [Reading from CSV file](#csv)
- [Sorting on a single column: sort() / orderBy()](#sort_single)
- [Sorting on multiple columns sort() / orderBy()](#sort_multiple)
- [Removing duplicates across all columns](#distinct)
- [Removing duplicates across specific column(s)](#drop_duplicates)
- [Create new column based on values from another column](#new_column)
- [Using User-Defined Function UDF to Create New Column](#udf)
- [Using `when` to duplicate IF-ELSE Logic To Create New Column](#when)
- [Using sql to create new column](#sql_when)
- [Re-Arrange Columns using select()](#re-order)
- [Joining or merging](#merging)
- [Filtering](#filtering)
- [Connecting to relational databases](#databases)

## Input / Reading Data

[[Back to Top](#top)]

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better

Sample data can be obtained from [here](https://jacobceles.github.io/knowledge_repo/colab_and_pyspark/cars.csv).

<a id="csv">

#### Read csv file by infering or guessing schema

[[Back to Top](#top)]

In [2]:
df = spark.read.csv('data/cars.csv', header=True, sep=";")

In [3]:
df.show(5)

+--------------------+----+---------+------------+----------+------+------------+-----+------+
|                 Car| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|
+--------------------+----+---------+------------+----------+------+------------+-----+------+
|Chevrolet Chevell...|18.0|        8|       307.0|     130.0| 3504.|        12.0|   70|    US|
|   Buick Skylark 320|15.0|        8|       350.0|     165.0| 3693.|        11.5|   70|    US|
|  Plymouth Satellite|18.0|        8|       318.0|     150.0| 3436.|        11.0|   70|    US|
|       AMC Rebel SST|16.0|        8|       304.0|     150.0| 3433.|        12.0|   70|    US|
|         Ford Torino|17.0|        8|       302.0|     140.0| 3449.|        10.5|   70|    US|
+--------------------+----+---------+------------+----------+------+------------+-----+------+
only showing top 5 rows



In [4]:
df.printSchema()

root
 |-- Car: string (nullable = true)
 |-- MPG: string (nullable = true)
 |-- Cylinders: string (nullable = true)
 |-- Displacement: string (nullable = true)
 |-- Horsepower: string (nullable = true)
 |-- Weight: string (nullable = true)
 |-- Acceleration: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Origin: string (nullable = true)



#### Explicitly defining schema

In [5]:
from pyspark.sql.types import *

In [6]:
schema = StructType([
    StructField("Car", StringType(),True),
    StructField("MPG", DoubleType(),True),
    StructField("Cylinders", IntegerType(),True),
    StructField("Displacement", DoubleType(), True),
    StructField("Horsepower", DoubleType(), True),
    StructField("Weight", DoubleType(), True),
    StructField("Acceleration", IntegerType(), True),
    StructField("Model", IntegerType(), True),
    StructField("Origin", StringType(), True)
  ])

In [19]:
df = spark.read.csv('data/cars.csv', header=True, sep=";", schema=schema)

In [8]:
df.printSchema()

root
 |-- Car: string (nullable = true)
 |-- MPG: double (nullable = true)
 |-- Cylinders: integer (nullable = true)
 |-- Displacement: double (nullable = true)
 |-- Horsepower: double (nullable = true)
 |-- Weight: double (nullable = true)
 |-- Acceleration: integer (nullable = true)
 |-- Model: integer (nullable = true)
 |-- Origin: string (nullable = true)



In [9]:
df.show(5)

+--------------------+----+---------+------------+----------+------+------------+-----+------+
|                 Car| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|
+--------------------+----+---------+------------+----------+------+------------+-----+------+
|Chevrolet Chevell...|18.0|        8|       307.0|     130.0|3504.0|        null|   70|    US|
|   Buick Skylark 320|15.0|        8|       350.0|     165.0|3693.0|        null|   70|    US|
|  Plymouth Satellite|18.0|        8|       318.0|     150.0|3436.0|        null|   70|    US|
|       AMC Rebel SST|16.0|        8|       304.0|     150.0|3433.0|        null|   70|    US|
|         Ford Torino|17.0|        8|       302.0|     140.0|3449.0|        null|   70|    US|
+--------------------+----+---------+------------+----------+------+------------+-----+------+
only showing top 5 rows



<a id="sort_single">

## Sorting on Single Column

[[Back to Top](#top)]

In [12]:
from pyspark.sql.functions import col

df.sort(df.MPG.asc())

# Alternative methods if your column names has a space in it
# df.orderBy(['MPG'], ascending=[False]).show(10, truncate=False)
# df.sort(col("MPG").asc())

Car,MPG,Cylinders,Displacement,Horsepower,Weight,Acceleration,Model,Origin
Citroen DS-21 Pallas,0.0,4,133.0,115.0,3090.0,,70,Europe
Ford Mustang Boss...,0.0,8,302.0,140.0,3353.0,,70,US
Ford Torino (sw),0.0,8,351.0,153.0,4034.0,,70,US
Volkswagen Super ...,0.0,4,97.0,48.0,1978.0,,71,Europe
Saab 900s,0.0,4,121.0,110.0,2800.0,,81,Europe
Chevrolet Chevell...,0.0,8,350.0,165.0,4142.0,,70,US
Plymouth Satellit...,0.0,8,383.0,175.0,4166.0,,70,US
AMC Rebel SST (sw),0.0,8,360.0,175.0,3850.0,,70,US
Hi 1200D,9.0,8,304.0,193.0,4732.0,,70,US
Ford F250,10.0,8,360.0,215.0,4615.0,,70,US


<a id="sort_multiple">

## Sorting on Multiple Columns

[[Back to Top](#top)]

In [13]:
from pyspark.sql.functions import col

df.sort(df.MPG.asc(), df.Displacement.desc())

# use col(column_name) when your column name has a space in it
# df.orderBy(['MPG','Displacement'], ascending=[False, True]).show(10, truncate=False)
# df.sort(col("MPG").asc())

Car,MPG,Cylinders,Displacement,Horsepower,Weight,Acceleration,Model,Origin
Plymouth Satellit...,0.0,8,383.0,175.0,4166.0,,70,US
AMC Rebel SST (sw),0.0,8,360.0,175.0,3850.0,,70,US
Ford Torino (sw),0.0,8,351.0,153.0,4034.0,,70,US
Chevrolet Chevell...,0.0,8,350.0,165.0,4142.0,,70,US
Ford Mustang Boss...,0.0,8,302.0,140.0,3353.0,,70,US
Citroen DS-21 Pallas,0.0,4,133.0,115.0,3090.0,,70,Europe
Saab 900s,0.0,4,121.0,110.0,2800.0,,81,Europe
Volkswagen Super ...,0.0,4,97.0,48.0,1978.0,,71,Europe
Hi 1200D,9.0,8,304.0,193.0,4732.0,,70,US
Ford F250,10.0,8,360.0,215.0,4615.0,,70,US


<a id="distinct">

## Removing Duplicates Across All Columns Using distinct()

[[Back to Top](#top)]

In [14]:
data = [("James", "Sales", 3000), \
    ("Michael", "Sales", 4600), \
    ("Robert", "Sales", 4600), \
    ("James", "Sales", 3000)
]
columns= ["employee_name", "department", "salary"]

# Create DataFrame
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: long (nullable = true)

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4600  |
|James        |Sales     |3000  |
+-------------+----------+------+



In [15]:
distinctDF = df.distinct()
distinctDF.show(truncate=False)

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4600  |
+-------------+----------+------+



<a id="drop_duplicates">

## Removing Duplicates Across Specific Columns Using drop_duplicates()

[[Back to Top](#top)]

In [16]:
dropDisDF = df.drop_duplicates(["department","salary"])
dropDisDF.show(truncate=False)

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
+-------------+----------+------+



## Data Transformations

<a id="new_column">

#### Create new column based on values from another column

[[Back to Top](#top)]

In [17]:
from pyspark.sql.functions import split, col

In [20]:
df = df.withColumn(
    "Make", split(col("Car"), " ")
    .getItem(0)
).withColumn(
    "Model", split(col("Car"), " ")
    .getItem(1)
)

In [21]:
df.show(5, truncate=False)

+-------------------------+----+---------+------------+----------+------+------------+---------+------+---------+
|Car                      |MPG |Cylinders|Displacement|Horsepower|Weight|Acceleration|Model    |Origin|Make     |
+-------------------------+----+---------+------------+----------+------+------------+---------+------+---------+
|Chevrolet Chevelle Malibu|18.0|8        |307.0       |130.0     |3504.0|null        |Chevelle |US    |Chevrolet|
|Buick Skylark 320        |15.0|8        |350.0       |165.0     |3693.0|null        |Skylark  |US    |Buick    |
|Plymouth Satellite       |18.0|8        |318.0       |150.0     |3436.0|null        |Satellite|US    |Plymouth |
|AMC Rebel SST            |16.0|8        |304.0       |150.0     |3433.0|null        |Rebel    |US    |AMC      |
|Ford Torino              |17.0|8        |302.0       |140.0     |3449.0|null        |Torino   |US    |Ford     |
+-------------------------+----+---------+------------+----------+------+------------+--

<a id="udf">

#### Using UDF to Create New Column

[[Back to Top](#top)]

In [25]:
data = [
    ("bacon", 4.0),
    ("pulled pork", 3.0),
    ("bacon", 12.0),
    ("pastrami", 6.0),
    ("corned beef", 7.5),
    ("bacon", 8.0),
    ("pastrami", 3.0),
    ("honey ham", 5.0),
    ("nova lox", 6.0),
  ]

In [26]:
schema = StructType([
    StructField("Food", StringType(),True),
    StructField("Ounces", DoubleType(),True),
  ])

In [27]:
df_food = spark.createDataFrame(data=data,schema=schema)

In [28]:
df_food.show()

+-----------+------+
|       Food|Ounces|
+-----------+------+
|      bacon|   4.0|
|pulled pork|   3.0|
|      bacon|  12.0|
|   pastrami|   6.0|
|corned beef|   7.5|
|      bacon|   8.0|
|   pastrami|   3.0|
|  honey ham|   5.0|
|   nova lox|   6.0|
+-----------+------+



In [29]:
from pyspark.sql.functions import udf

In [30]:
def food2animal(column):
    if column == 'bacon':
        return 'pig'
    elif column == 'pulled pork':
        return 'pig'
    elif column == 'pastrami':
        return 'cow'
    elif column == 'corned beef':
        return 'cow'
    elif column == 'honey ham':
        return 'pig'
    else:
        return 'salmon'

In [31]:
food2animal_udf = udf(food2animal, StringType())

In [32]:
df_food_with_animal = df_food.withColumn("animal", food2animal_udf("Food"))

In [33]:
df_food_with_animal.show()

+-----------+------+------+
|       Food|Ounces|animal|
+-----------+------+------+
|      bacon|   4.0|   pig|
|pulled pork|   3.0|   pig|
|      bacon|  12.0|   pig|
|   pastrami|   6.0|   cow|
|corned beef|   7.5|   cow|
|      bacon|   8.0|   pig|
|   pastrami|   3.0|   cow|
|  honey ham|   5.0|   pig|
|   nova lox|   6.0|salmon|
+-----------+------+------+



<a id="when">

#### Using `when` to duplicate IF-ELSE Logic To Create New Column

[[Back to Top](#top)]

In [34]:
from pyspark.sql.functions import when

In [35]:
data = [
    ("bacon", 4.0),
    ("pulled pork", 3.0),
    ("bacon", 12.0),
    ("pastrami", 6.0),
    ("corned beef", 7.5),
    ("bacon", 8.0),
    ("pastrami", 3.0),
    ("honey ham", 5.0),
    ("nova lox", 6.0),
  ]

schema = StructType([
    StructField("Food", StringType(),True),
    StructField("Ounces", DoubleType(),True),
  ])

df_food = spark.createDataFrame(data=data,schema=schema)

In [36]:
df_food.show()

+-----------+------+
|       Food|Ounces|
+-----------+------+
|      bacon|   4.0|
|pulled pork|   3.0|
|      bacon|  12.0|
|   pastrami|   6.0|
|corned beef|   7.5|
|      bacon|   8.0|
|   pastrami|   3.0|
|  honey ham|   5.0|
|   nova lox|   6.0|
+-----------+------+



In [37]:
df_food.withColumn(
    'Animal',
    when(col("Food") == 'bacon', 'pork')
    .when(col("Food") == 'pulled pork', 'pork')
    .when(col("Food") == 'pastrami', 'cow')
    .when(col("Food") == 'corned beef', 'cow')
    .when(col("Food") == 'honey ham', 'pig')
    .otherwise('salmon')
)

Food,Ounces,Animal
bacon,4.0,pork
pulled pork,3.0,pork
bacon,12.0,pork
pastrami,6.0,cow
corned beef,7.5,cow
bacon,8.0,pork
pastrami,3.0,cow
honey ham,5.0,pig
nova lox,6.0,salmon


<a id="re-order">

<a id="sql_when">

## Using sql to create new column

[[Back to Top](#top)]

In [39]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [40]:
data = [
    ("bacon", 4.0),
    ("pulled pork", 3.0),
    ("bacon", 12.0),
    ("pastrami", 6.0),
    ("corned beef", 7.5),
    ("bacon", 8.0),
    ("pastrami", 3.0),
    ("honey ham", 5.0),
    ("nova lox", 6.0),
  ]

schema = StructType([
    StructField("Food", StringType(),True),
    StructField("Ounces", DoubleType(),True),
  ])

df_food = spark.createDataFrame(data=data,schema=schema)

In [42]:
df_food.createOrReplaceTempView('food_table')
newDF = spark.sql(
    '''
    select *,
    case
    when Food = 'bacon' then 'pig'
    when Food = 'pulled pork' then 'pig'
    when Food = 'pastrami' then 'cow'
    when Food = 'corned beef' then 'cow'
    when Food = 'honey ham' then 'pig'
    else 'salmon' end as Animal from food_table
    '''
)
newDF.show()

+-----------+------+------+
|       Food|Ounces|Animal|
+-----------+------+------+
|      bacon|   4.0|   pig|
|pulled pork|   3.0|   pig|
|      bacon|  12.0|   pig|
|   pastrami|   6.0|   cow|
|corned beef|   7.5|   cow|
|      bacon|   8.0|   pig|
|   pastrami|   3.0|   cow|
|  honey ham|   5.0|   pig|
|   nova lox|   6.0|salmon|
+-----------+------+------+



#### Re-Arrange Columns using `select()`

[[Back to Top](#top)]

In [24]:
df.select('Make', 'Model', 'MPG', 'Cylinders', 'Displacement', 'Horsepower', 'Weight', 'Acceleration', 'Origin')

Make,Model,MPG,Cylinders,Displacement,Horsepower,Weight,Acceleration,Origin
Chevrolet,Chevelle,18.0,8,307.0,130.0,3504.0,,US
Buick,Skylark,15.0,8,350.0,165.0,3693.0,,US
Plymouth,Satellite,18.0,8,318.0,150.0,3436.0,,US
AMC,Rebel,16.0,8,304.0,150.0,3433.0,,US
Ford,Torino,17.0,8,302.0,140.0,3449.0,,US
Ford,Galaxie,15.0,8,429.0,198.0,4341.0,,US
Chevrolet,Impala,14.0,8,454.0,220.0,4354.0,,US
Plymouth,Fury,14.0,8,440.0,215.0,4312.0,,US
Pontiac,Catalina,14.0,8,455.0,225.0,4425.0,,US
AMC,Ambassador,15.0,8,390.0,190.0,3850.0,,US


## Data Summarizations

[[Back to Top](#top)]

In [43]:
df.show(truncate=False)

+--------------------------------+----+---------+------------+----------+------+------------+----------+------+---------+
|Car                             |MPG |Cylinders|Displacement|Horsepower|Weight|Acceleration|Model     |Origin|Make     |
+--------------------------------+----+---------+------------+----------+------+------------+----------+------+---------+
|Chevrolet Chevelle Malibu       |18.0|8        |307.0       |130.0     |3504.0|null        |Chevelle  |US    |Chevrolet|
|Buick Skylark 320               |15.0|8        |350.0       |165.0     |3693.0|null        |Skylark   |US    |Buick    |
|Plymouth Satellite              |18.0|8        |318.0       |150.0     |3436.0|null        |Satellite |US    |Plymouth |
|AMC Rebel SST                   |16.0|8        |304.0       |150.0     |3433.0|null        |Rebel     |US    |AMC      |
|Ford Torino                     |17.0|8        |302.0       |140.0     |3449.0|null        |Torino    |US    |Ford     |
|Ford Galaxie 500       

#### Count of rows

In [44]:
df.count()

406

#### Counts by Groups within a Single Column

In [45]:
df.groupBy('Origin').count().withColumnRenamed('count', 'Count')

Origin,Count
Europe,73
US,254
Japan,79


#### Aggregations

In [46]:
from pyspark.sql.functions import mean 

In [47]:
df.groupBy(
    "Origin"
).agg(
    mean('MPG')
).show()

+------+------------------+
|Origin|          avg(MPG)|
+------+------------------+
|Europe|26.745205479452057|
|    US|19.688188976377948|
| Japan|30.450632911392397|
+------+------------------+



<a id="merging">

## Joining / Merging

[[Back to Top](#top)]

In [48]:
df_counts = df.groupBy('Origin').count().withColumnRenamed('count', 'Count')

In [49]:
df_avgs = df.groupBy(
    "Origin"
).agg(
    mean('MPG')
)

In [50]:
df_counts.show()

+------+-----+
|Origin|Count|
+------+-----+
|Europe|   73|
|    US|  254|
| Japan|   79|
+------+-----+



In [51]:
df_avgs = df_avgs.withColumnRenamed('avg(MPG)', 'Avg')

In [52]:
df_avgs.show()

+------+------------------+
|Origin|               Avg|
+------+------------------+
|Europe|26.745205479452057|
|    US|19.688188976377948|
| Japan|30.450632911392397|
+------+------------------+



In [53]:
df_counts.join(df_avgs, df_counts.Origin == df_avgs.Origin, 'inner').select(df_counts.Origin, df_counts.Count, df_avgs.Avg)

Origin,Count,Avg
Europe,73,26.745205479452057
US,254,19.688188976377948
Japan,79,30.4506329113924


As you can see, we have done an inner join between two dataframes. The following joins are supported by PySpark:

- inner (default)
- cross
- outer
- full
- full_outer
- left
- left_outer
- right
- right_outer
- left_semi
- left_anti

<a id="filtering">

## Filtering

[[Back to Top](#top)]

In [54]:
df.show(truncate=False)

+--------------------------------+----+---------+------------+----------+------+------------+----------+------+---------+
|Car                             |MPG |Cylinders|Displacement|Horsepower|Weight|Acceleration|Model     |Origin|Make     |
+--------------------------------+----+---------+------------+----------+------+------------+----------+------+---------+
|Chevrolet Chevelle Malibu       |18.0|8        |307.0       |130.0     |3504.0|null        |Chevelle  |US    |Chevrolet|
|Buick Skylark 320               |15.0|8        |350.0       |165.0     |3693.0|null        |Skylark   |US    |Buick    |
|Plymouth Satellite              |18.0|8        |318.0       |150.0     |3436.0|null        |Satellite |US    |Plymouth |
|AMC Rebel SST                   |16.0|8        |304.0       |150.0     |3433.0|null        |Rebel     |US    |AMC      |
|Ford Torino                     |17.0|8        |302.0       |140.0     |3449.0|null        |Torino    |US    |Ford     |
|Ford Galaxie 500       

In [55]:
df.filter(col('Make')=='Chevrolet').show(5)

+--------------------+----+---------+------------+----------+------+------------+--------+------+---------+
|                 Car| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|   Model|Origin|     Make|
+--------------------+----+---------+------------+----------+------+------------+--------+------+---------+
|Chevrolet Chevell...|18.0|        8|       307.0|     130.0|3504.0|        null|Chevelle|    US|Chevrolet|
|    Chevrolet Impala|14.0|        8|       454.0|     220.0|4354.0|        null|  Impala|    US|Chevrolet|
|Chevrolet Chevell...| 0.0|        8|       350.0|     165.0|4142.0|        null|Chevelle|    US|Chevrolet|
|Chevrolet Monte C...|15.0|        8|       400.0|     150.0|3761.0|        null|   Monte|    US|Chevrolet|
| Chevrolet Vega 2300|28.0|        4|       140.0|      90.0|2264.0|        null|    Vega|    US|Chevrolet|
+--------------------+----+---------+------------+----------+------+------------+--------+------+---------+
only showing top 5 rows



In [56]:
df.filter(col('Make').contains('Chev')).show(5)

+--------------------+----+---------+------------+----------+------+------------+--------+------+---------+
|                 Car| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|   Model|Origin|     Make|
+--------------------+----+---------+------------+----------+------+------------+--------+------+---------+
|Chevrolet Chevell...|18.0|        8|       307.0|     130.0|3504.0|        null|Chevelle|    US|Chevrolet|
|    Chevrolet Impala|14.0|        8|       454.0|     220.0|4354.0|        null|  Impala|    US|Chevrolet|
|Chevrolet Chevell...| 0.0|        8|       350.0|     165.0|4142.0|        null|Chevelle|    US|Chevrolet|
|Chevrolet Monte C...|15.0|        8|       400.0|     150.0|3761.0|        null|   Monte|    US|Chevrolet|
|           Chevy C20|10.0|        8|       307.0|     200.0|4376.0|        null|     C20|    US|    Chevy|
+--------------------+----+---------+------------+----------+------+------------+--------+------+---------+
only showing top 5 rows



In [57]:
df.filter(
    (col('Make').contains('Chev')) &
    (col('Cylinders') < 8)
).show()

+--------------------+----+---------+------------+----------+------+------------+--------+------+----------+
|                 Car| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|   Model|Origin|      Make|
+--------------------+----+---------+------------+----------+------+------------+--------+------+----------+
| Chevrolet Vega 2300|28.0|        4|       140.0|      90.0|2264.0|        null|    Vega|    US| Chevrolet|
|Chevrolet Chevell...|17.0|        6|       250.0|     100.0|3329.0|        null|Chevelle|    US| Chevrolet|
| Chevrolet Vega (sw)|22.0|        4|       140.0|      72.0|2408.0|        null|    Vega|    US| Chevrolet|
|      Chevrolet Vega|20.0|        4|       140.0|      90.0|2408.0|        null|    Vega|    US| Chevrolet|
|Chevrolet Nova Cu...|16.0|        6|       250.0|     100.0|3278.0|        null|    Nova|    US| Chevrolet|
|      Chevrolet Vega|21.0|        4|       140.0|      72.0|2401.0|        null|    Vega|    US| Chevrolet|
|      Chevrolet No

In [58]:
spark.stop()

<a id="databases">

## Connecting to Relational Databases (JDBC)

[[Back to Top](#top)]

[Link](https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/from_to_dbms.html) to their documentation

#### PostgreSQL

In [None]:
import configparser
import os
import pyspark
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.master("local[*]").appName("Postgres")\
    .config("spark.jars", "C:\\Users\\some_user\\drivers\\jdbc\\postgresql\\postgresql-42.2.23.jar")\
    .getOrCreate()

In [None]:
config_file = os.getenv("CONFIG_PATH")

In [None]:
config = configparser.ConfigParser()
try:
    config.read(config_file)
except ConfigFileNotFound:
    print("config.ini file not found")

In [None]:
# Read in the Postgresql database credentials for DSN-less connection
PG_HOST = config["sc_health_postgres"]["HOST"]
PG_PORT = config["sc_health_postgres"]["PORT"]
PG_DB = config["sc_health_postgres"]["DB"]
PG_USER = config["sc_health_postgres"]["USER"]
PG_PWD = config["sc_health_postgres"]["PWD"]

In [None]:
url = f'jdbc:postgresql://{PG_HOST}:{PG_PORT}/{PG_DB}'
driver = 'org.postgresql.Driver'

In [None]:
query = "SELECT CURRENT_DATE"

In [None]:
jdbcDF = spark.read \
    .format("jdbc") \
    .option("driver", driver) \
    .option("url", url) \
    .option("user", PG_USER) \
    .option("password", PG_PWD) \
    .option("query", query) \
    .load()

In [None]:
jdbcDF.show()

In [None]:
spark.stop()

#### IBM DB2 LUW

In [None]:
spark = SparkSession.builder.master("local[*]").appName("SCOODS")\
    .config("spark.jars", "C:\\Users\\some_user\\drivers\\jdbc\\mainframe\\db2jcc.jar")\
    .getOrCreate()

In [None]:
config_file = os.getenv("CONFIG_PATH")

In [None]:
config = configparser.ConfigParser()
try:
    config.read(config_file)
except ConfigFileNotFound:
    print("config.ini file not found")

In [None]:
# Read in the DB2 LUW credentials for DSN-less connection
SCOODS_HOST = config["scoods_prod"]["HOST"]
SCOODS_PORT = config["scoods_prod"]["PORT"]
SCOODS_DB = config["scoods_prod"]["DB"]
SCOODS_USER = config["scoods_prod"]["USER"]
SCOODS_PWD = config["scoods_prod"]["PWD"]

In [None]:
url = f'jdbc:db2://{SCOODS_HOST}:{SCOODS_PORT}/{SCOODS_DB}:useJDBC4ColumnNameAndLabelSemantics=false;'
driver = 'com.ibm.db2.jcc.DB2Driver'

In [None]:
query = "SELECT CURRENT TIMESTAMP as DATETIME_NOW FROM SYSIBM.SYSDUMMY1"

In [None]:
jdbcDF = spark.read \
    .format("jdbc") \
    .option("driver", driver) \
    .option("url", url) \
    .option("user", SCOODS_USER) \
    .option("password", SCOODS_PWD) \
    .option("query", query) \
    .load()

In [None]:
jdbcDF.show(truncate=False)

In [None]:
spark.stop()

#### Microsoft SQL Server

In [None]:
spark = SparkSession.builder.master("local[*]").appName("NAPS")\
    .config("spark.jars", "C:\\Users\\some_user\\drivers\\mssql_jdbc\\mssql-jdbc-9.4.0.jre8.jar")\
    .getOrCreate()

In [None]:
config_file = os.getenv("CONFIG_PATH")

In [None]:
config = configparser.ConfigParser()
try:
    config.read(config_file)
except ConfigFileNotFound:
    print("config.ini file not found")

In [None]:
# Read in the SQL Server database credentials for DSN-less connection
NAPS_HOST = config["naps"]["HOST"]
NAPS_PORT = config["naps"]["PORT"]
NAPS_DB = config["naps"]["DB"]

In [None]:
url = f'jdbc:sqlserver://{NAPS_HOST}:{NAPS_PORT};databaseName={NAPS_DB};integratedSecurity=true'
driver = 'com.microsoft.sqlserver.jdbc.SQLServerDriver'

In [None]:
query = "SELECT * from DimCarrier"

In [None]:
jdbcDF = spark.read \
    .format("jdbc") \
    .option("driver", driver) \
    .option("url", url) \
    .option("query", query) \
    .load()

In [None]:
jdbcDF.show(truncate=False)

In [None]:
spark.stop()

#### IBM DB2 z/OS

Unfortunately, the IBM Java JRE does not work with Spark 3.x and thus, we will not be able to connect to mainframe DB2 z/OS platform.  Mainframe is too OLD!  Let it rest in peace.

#### Connecting to Snowflake

[Link](https://docs.snowflake.com/en/user-guide/spark-connector-install.html) to Snowflake's documentation on working with the PySpark connector

In [1]:
from pathlib import Path
import configparser
import os
import pyspark
from pyspark.sql import SparkSession

In [2]:
config_file = os.getenv("CONFIG_PATH")

In [3]:
config = configparser.ConfigParser()
try:
    config.read(config_file)
except ConfigFileNotFound:
    print("config.ini file not found")

JDBC driver and Snowflake Spark Connector can be downloaded [here](https://search.maven.org/search?q=g:net.snowflake)

In [4]:
sf_jdbc_driver = config['snowflake']['jdbc_driver_path']
sf_spark_driver = config['snowflake']['spark_driver_path']

In [5]:
sf_account = config['snowflake']['account']
sf_user = config['snowflake']['username']
sf_database = config['snowflake']['database']
sf_schema = config['snowflake']['schema']
sf_role = config['snowflake']['role']
sf_warehouse = config['snowflake']['warehouse']
sf_authenticator = config['snowflake']['authenticator']

In [6]:
spark = (
    SparkSession.builder.master("local[*]")
    .appName("Snowflake_JDBC")
    .config("spark.jars", f"{sf_jdbc_driver},{sf_spark_driver}")
    .getOrCreate()
)

In [7]:
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

In [8]:
# Snowflake connection parameters
sfparams = {
    "sfURL" : f"{sf_account}.snowflakecomputing.com",
    "sfUser" : sf_user,
    "sfPassword" : "your_password",  # Not applicable when using externalbrowser authenticator
    "sfDatabase" : sf_database,
    "sfSchema" : sf_schema,
    "sfRole" : sf_role,
    "sfWarehouse" : sf_warehouse,
    "sfAuthenticator" : sf_authenticator
}

In [9]:
query = "SELECT CURRENT_DATE as my_date"

Snowflake query as PySpark dataframe

In [10]:
#run custom query
df = (
    spark.read.format(SNOWFLAKE_SOURCE_NAME)
    .options(**sfparams)
    .option("query", query)
    .load()
)

In [11]:
df.show()

+----------+
|   MY_DATE|
+----------+
|2022-02-26|
+----------+



Dataframe as Snowflake table

In [12]:
(df
 .select("my_date").write.format(SNOWFLAKE_SOURCE_NAME)
 .options(**sfparams)
 .option("dbtable", "my_table")
 .mode("overwrite")
 .save()
)

In [13]:
spark.stop()