In [3]:
# Create spark session
from pyspark.sql import SparkSession
session = SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/07/13 21:07:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Create and Interact with RDD

In [6]:
# Create a Resilient Distributed Dataset (RDD)
rdd = session.sparkContext.parallelize([1, 2, 3])

In [9]:
# Return first 2 values
rdd.take(num=2)

[1, 2]

In [8]:
# Return the length of RDD
rdd.count()

                                                                                

3

In [10]:
# Send all RDD data to the driver as an array
rdd.collect()

[1, 2, 3]

### Dataframes

In [13]:
# Create dataframe
df = session.createDataFrame(
  [[1,2,3], [4,5,6]], ['column1', 'column2', 'column3']
)

In [14]:
# Display dataframe with first n rows
df.show(n=3)

+-------+-------+-------+
|column1|column2|column3|
+-------+-------+-------+
|      1|      2|      3|
|      4|      5|      6|
+-------+-------+-------+



### Dataframe manipulation - User Defined Functions (UDFs)

In [15]:
import pyspark.sql.functions as funcs
import pyspark.sql.types as types
def multiply_by_ten(number):
    return number*10.0
multiply_udf = funcs.udf(multiply_by_ten, types.DoubleType())
transformed_df = df.withColumn(
    'multiplied', multiply_udf('column1')
)
transformed_df.show()

                                                                                

+-------+-------+-------+----------+
|column1|column2|column3|multiplied|
+-------+-------+-------+----------+
|      1|      2|      3|      10.0|
|      4|      5|      6|      40.0|
+-------+-------+-------+----------+



### RDD Mapping

In [18]:
import pyspark.sql.types as types
import math
def take_log_in_all_columns(row: types.Row):
     old_row = row.asDict()
     new_row = {f'log({column_name})': math.log(value) 
                for column_name, value in old_row.items()}
     return types.Row(**new_row)

In [19]:
logarithmic_dataframe = df.rdd.map(take_log_in_all_columns).toDF()

In [21]:
logarithmic_dataframe.show()

+------------------+------------------+------------------+
|      log(column1)|      log(column2)|      log(column3)|
+------------------+------------------+------------------+
|               0.0|0.6931471805599453|1.0986122886681098|
|1.3862943611198906|1.6094379124341003| 1.791759469228055|
+------------------+------------------+------------------+



### SQL Operations

In [23]:
# SElECT
df.select('column1', 'column2')

DataFrame[column1: bigint, column2: bigint]

In [24]:
# WHERE
df.where('column1 = 3')

DataFrame[column1: bigint, column2: bigint, column3: bigint]

In [None]:
# INNER JOIN
df.join(df1, ['column1'], how='inner')

In [None]:
# Create temporal view of dataframe
df.createOrReplaceTempView(“table1”)

# Perform query on top of view
df2 = session.sql("SELECT column1 AS f1, column2 as f2 from table1")

### DataFrame Column Operations

In [27]:
df3 = df.withColumn(
    'derived_column', df['column1'] + df['column2'] * df['column3']
)

### Aggregations and quick Statistics

In [29]:
ADULT_COLUMN_NAMES = [
     "age",
     "workclass",
     "fnlwgt",
     "education",
     "education_num",
     "marital_status",
     "occupation",
     "relationship",
     "race",
     "sex",
     "capital_gain",
     "capital_loss",
     "hours_per_week",
     "native_country",
     "income"
 ]

In [None]:
csv_df = session.read.csv(
     'data/adult.data.csv', header=False, inferSchema=True
 )
for new_col, old_col in zip(ADULT_COLUMN_NAMES, csv_df.columns):
     csv_df = csv_df.withColumnRenamed(old_col, new_col)

In [None]:
csv_df.describe().show()

In [None]:
work_hours_df = csv_df.groupBy(
    'age'
).agg(
    funcs.avg('hours_per_week'),
    funcs.stddev_samp('hours_per_week')
).sort('age')

### Connecting to Database

In [30]:
session = SparkSession.builder.config(
    'spark.jars', 'bin/postgresql-42.2.16.jar'
).config(
    'spark.driver.extraClassPath', 'bin/postgresql-42.2.16.jar'
).getOrCreate()

22/07/13 21:23:22 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
22/07/13 22:14:10 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 399268 ms exceeds timeout 120000 ms
22/07/13 22:14:10 WARN SparkContext: Killing executors is not supported by current scheduler.


In [None]:
url = f"jdbc:postgresql://your_host_ip:5432/your_database"
properties = {'user': 'your_user', 'password': 'your_password'}
# read from a table into a dataframe
df = session.read.jdbc(
    url=url, table='your_table_name', properties=properties
)

In [None]:
transformed_df.write.jdbc(
    url=url, table='new_table', mode='append', properties=properties
)