<a href="https://colab.research.google.com/github/ivmarchuk/pyspark-learn/blob/main/pyspark_learn.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark

In [29]:
import pyspark.sql.types as t # for data types conversion
import pyspark.sql.functions as F
from pyspark.sql import SparkSession

In [31]:
# create SparkSession
# parameters for connection could be entered (S3, Google Storage etc.)
spark = SparkSession.builder.getOrCreate()

SparkSession - in-memory

SparkContext

Spark UI

Version
v3.3.0

Master
local[*]

AppName
pyspark-shell

In [3]:
# read csv to dataframe
df = spark.read.format('csv').option('header', 'true').load('/data-for-spark/cars.csv')

In [None]:
# to look at wide tables
df.show(vertical = True) 

In [None]:
# look at columns
df.select(F.col('manufacturer_name'), F.col('model_name')).show(10)

In [None]:
# filter 
# df\
#   .select('manufacturer_name', 'model_name', 'transmission')\
#   .filter('manufacturer_name = "Audi"')\
#   .filter('transmission = "mechanical"').show(10)

# better filter -> == could be external parameter cand inserted automatically as var 
df\
  .select('manufacturer_name', 'model_name', 'transmission')\
  .filter(F.col('manufacturer_name') == 'Subaru')\
  .filter(F.col('transmission') == 'mechanical').show(10)

In [None]:
# count 
from pyspark.sql.functions import countDistinct
# df.count() # for all rows 
# df.distinct().count()
df.select(countDistinct("manufacturer_name")).show()

In [None]:
# group and order
df\
  .groupBy('manufacturer_name')\
  .count()\
  .orderBy(F.col('count').desc()).show(10)

In [None]:
# rename
df\
  .withColumnRenamed('manufacturer_name', 'firm')\
  .select('firm').show(10)

In [None]:
# add column 
df\
  .withColumn('next_year', F.col('year_produced') + 5)\
  .select('year_produced', 'next_year').show(10)

In [None]:
# column types
df.printSchema()

In [None]:
# df stats 
df.select('manufacturer_name', 'year_produced').describe().show()

In [None]:
# ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [21]:
# extract csv 
# define metrics 
# load new csv 

In [35]:
def main ():
  # start spark session
  spark = SparkSession.builder.getOrCreate()

  # df - read
  df = spark.read.format('csv').option('header', 'true').load('/data-for-spark/cars.csv')

  # output - df 
  output = (
      df
      .groupBy('manufacturer_name')
      .agg(
          F.count('manufacturer_name').alias('firm'),
          F.round(F.avg('year_produced')).cast(t.IntegerType()).alias('average_year'),
          F.min(F.col('price_usd')).cast(t.FloatType()).alias('min_price'),
          F.max(F.col('price_usd')).cast(t.FloatType()).alias('man_price')
      )
  )

  output.write.mode('overwrite'.format('json')).save('/data-for-spark/output_cars.json') # coalesce to define number of partitions in output file [or to define pandas df to have 1 output file if df is not too large]


In [36]:
main()

# stop session
spark.stop()

In [None]:
# 1. Create py file with script
# 2. venv -> spar-submit script.py (for exact python script)