In [4]:
from time import sleep

from pyspark.sql import SparkSession, Window
from pyspark.sql.types import *
from pyspark.sql.functions import *

# SparkSession is the entry point for the HIGH-LEVEL API (DataFrames, Spark SQL)
spark = SparkSession. \
    builder. \
    config("spark.sql.autoBroadcastJoinThreshold", 0). \
    appName("Joins"). \
    master("local"). \
    getOrCreate()

In [None]:
movies_df = spark.read. \
    format("json"). \
    option("inferSchema", "true"). \
    load("data/movies")

In [None]:
# 1
# what's wrong with a SinglePartition
# how to add column with row_num() and count()
# read.parquet.count use schema

whole_dataset = Window.partitionBy().orderBy(col("Title").asc_nulls_last())

single_part_df = movies_df.select(col("Title"), row_number().over(whole_dataset))
single_part_df.explain()
# single_part_df.show()


In [None]:
non_single_part_df = movies_df.select(col("Title"), monotonically_increasing_id())
non_single_part_df.explain()
single_part_df.sample(0.1).show()


In [None]:
# 2
# How to read all data from cache?
# Partial caching - cashing only parts which were calculated by some action. That is the couse that part of data
# was from cache the other from source.

partition_of_100_df = spark.range(0, 10000, 1, 100)
partition_of_100_df.cache()


In [None]:
# use only one partition, use only one partition FRACTION CACHE 1% - http://localhost:4040/storage/
# consistence can be uncorrected USE .count to put all data to cache
# deserialized - as Java object, serialized - as Array[Byte]

# partition_of_100_df.show(1)

partition_of_100_df.count()
partition_of_100_df.show(1)


In [None]:
# show data on local disk and disk spil
# InMemoryRelation - load data to cache

partition_of_100_df.explain()
# InMemoryTableScn - load data to cache


In [None]:
# 3 Coalesce vs repartition


# 4 Join optimisation

In [4]:
# dataframe of facts

crime_facts = spark \
    .read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("data/crimes/crime.csv")

check = crime_facts.cache().count()
assert(check != 0)

In [3]:
# Catalyst optimiser - move filter up

grouped_crime_df = crime_facts.\
    groupBy(col("OFFENSE_CODE")).\
    count().\
    filter(col("OFFENSE_CODE") == 1402)

grouped_crime_df.explain(True)
grouped_crime_df.show()

== Parsed Logical Plan ==
'Filter ('OFFENSE_CODE = 1402)
+- Aggregate [OFFENSE_CODE#17], [OFFENSE_CODE#17, count(1) AS count#68L]
   +- Relation [INCIDENT_NUMBER#16,OFFENSE_CODE#17,OFFENSE_CODE_GROUP#18,OFFENSE_DESCRIPTION#19,DISTRICT#20,REPORTING_AREA#21,SHOOTING#22,OCCURRED_ON_DATE#23,YEAR#24,MONTH#25,DAY_OF_WEEK#26,HOUR#27,UCR_PART#28,STREET#29,Lat#30,Long#31,Location#32] csv

== Analyzed Logical Plan ==
OFFENSE_CODE: int, count: bigint
Filter (OFFENSE_CODE#17 = 1402)
+- Aggregate [OFFENSE_CODE#17], [OFFENSE_CODE#17, count(1) AS count#68L]
   +- Relation [INCIDENT_NUMBER#16,OFFENSE_CODE#17,OFFENSE_CODE_GROUP#18,OFFENSE_DESCRIPTION#19,DISTRICT#20,REPORTING_AREA#21,SHOOTING#22,OCCURRED_ON_DATE#23,YEAR#24,MONTH#25,DAY_OF_WEEK#26,HOUR#27,UCR_PART#28,STREET#29,Lat#30,Long#31,Location#32] csv

== Optimized Logical Plan ==
Aggregate [OFFENSE_CODE#17], [OFFENSE_CODE#17, count(1) AS count#68L]
+- Project [OFFENSE_CODE#17]
   +- Filter (isnotnull(OFFENSE_CODE#17) AND (OFFENSE_CODE#17 = 1402))

In [9]:
# small table with dict data
offense_сodes = spark.\
    read.\
    option("header", "true").\
    option("inferSchema", "true").\
    csv("data/crimes/offense_codes.csv")

offense_сodes.count()

assert(offense_сodes.count() == 576)

offense_сodes.show(100, False)


+----+----------------------------------------------------------+
|CODE|NAME                                                      |
+----+----------------------------------------------------------+
|612 |LARCENY PURSE SNATCH - NO FORCE                           |
|613 |LARCENY SHOPLIFTING                                       |
|615 |LARCENY THEFT OF MV PARTS & ACCESSORIES                   |
|1731|INCEST                                                    |
|3111|LICENSE PREMISE VIOLATION                                 |
|2646|LIQUOR - DRINKING IN PUBLIC                               |
|2204|LIQUOR LAW VIOLATION                                      |
|3810|M/V ACCIDENT - INVOLVING �BICYCLE - INJURY                |
|3801|M/V ACCIDENT - OTHER                                      |
|3807|M/V ACCIDENT - OTHER CITY VEHICLE                         |
|3803|M/V ACCIDENT - PERSONAL INJURY                            |
|3805|M/V ACCIDENT - POLICE VEHICLE                             |
|3802|M/V 

In [None]:
# Sort merge join example
rob_sort_merge_df = crime_facts.\
    join(offense_сodes, col("CODE") == col("OFFENSE_CODE")).\
    filter(col("NAME").startswith("ROBBERY")).\
    groupBy(col("NAME")).\
    count().\
    orderBy(col("count").desc())


rob_sort_merge_df.explain(True)
rob_sort_merge_df.show()

In [10]:
# Broadcast Join Comparing

rob_broadcast_df = crime_facts.\
    join(broadcast(offense_сodes), col("CODE") == col("OFFENSE_CODE")).\
    filter(col("NAME").startswith("ROBBERY")).\
    groupBy(col("NAME")).\
    count().\
    orderBy(col("count").desc())

rob_broadcast_df.explain(True)
rob_broadcast_df.show()

== Parsed Logical Plan ==
'Sort ['count DESC NULLS LAST], true
+- Aggregate [NAME#818], [NAME#818, count(1) AS count#904L]
   +- Filter StartsWith(NAME#818, ROBBERY)
      +- Join Inner, (CODE#817 = OFFENSE_CODE#100)
         :- Relation [INCIDENT_NUMBER#99,OFFENSE_CODE#100,OFFENSE_CODE_GROUP#101,OFFENSE_DESCRIPTION#102,DISTRICT#103,REPORTING_AREA#104,SHOOTING#105,OCCURRED_ON_DATE#106,YEAR#107,MONTH#108,DAY_OF_WEEK#109,HOUR#110,UCR_PART#111,STREET#112,Lat#113,Long#114,Location#115] csv
         +- ResolvedHint (strategy=broadcast)
            +- Relation [CODE#817,NAME#818] csv

== Analyzed Logical Plan ==
NAME: string, count: bigint
Sort [count#904L DESC NULLS LAST], true
+- Aggregate [NAME#818], [NAME#818, count(1) AS count#904L]
   +- Filter StartsWith(NAME#818, ROBBERY)
      +- Join Inner, (CODE#817 = OFFENSE_CODE#100)
         :- Relation [INCIDENT_NUMBER#99,OFFENSE_CODE#100,OFFENSE_CODE_GROUP#101,OFFENSE_DESCRIPTION#102,DISTRICT#103,REPORTING_AREA#104,SHOOTING#105,OCCURRED_ON_DA

# Shared variables

In [5]:
sc = spark.sparkContext
accum = sc.accumulator(0)

sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))

accum.value

10

In [6]:
broadcastVar = sc.broadcast([1, 2, 3])
broadcastVar.value

[1, 2, 3]