In [None]:
pip install pyarrow

In [None]:
%config Completer.use_jedi = False
import seaborn as sns
import matplotlib.pyplot as plt

In [None]:
import pandas as pd

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql import types
from pyspark import StorageLevel
from pyspark.sql import Window

In [None]:
spark = SparkSession.builder.getOrCreate()

In [None]:
train_df = spark.read.csv('train.csv', header=True, inferSchema=True)
test_df = spark.read.csv('test.csv', header=True, inferSchema=True)

# General Studies

# Test 1

# Test 2

In [None]:
### to_date not toDate. f. functions mostely have underscores.
train_df = train_df.withColumn('test_date', f.to_date(f.lit("2021-02-14"),'yyyy-mm-dd'))

In [None]:
### See SQL API for expr usage. approx_count_distinct(name) or count(distinct name)
### https://spark.apache.org/docs/latest/api/sql/index.html
# gdf = train_df.groupBy('Survived').agg(f.expr('count(distinct Sex)'))

In [None]:
### There is no read.view
# df.createOrReplaceTempView('my_view')
# spark.read.table("my_view").show()

In [None]:
### Map vs Flat Map. Both are narrow xforms and lazy eval.
### https://stackoverflow.com/questions/22350722/what-is-the-difference-between-map-and-flatmap-and-a-good-use-case-for-each

In [None]:
# schema = types.StructType([types.StructField('name',types.StringType(),True)])
# schema.add('new_col', types.StringType(),True)

In [None]:
df = spark.createDataFrame([("A", 20), ("B", 30), ("D", 80), ('E', None)],["Letter", "Number"])

In [None]:
# df.show()
# df.groupBy().sum().collect()[0][0]
# df.groupBy().sum().collect()

In [None]:
# sc = spark.sparkContext
# ac = sc.accumulator(1)

In [None]:
# df.show()

In [None]:
### df.dropna()
### df.na.drop()
# df.na.drop().show()

In [None]:
### https://stackoverflow.com/questions/44002128/when-are-cache-and-persist-executed-since-they-dont-seem-like-actions
### Cache and repart are lazyily evaluated. cache is not a xform or action, repart is a xform. Repart is wide.
# df.persist(storageLevel=StorageLevel(True,True,True,False,2)).collect()
# df.cache().collect()
# df.storageLevel
# df.unpersist().collect()

In [None]:
### With a managed table, because Spark manages everything, a SQL command such as
### DROP TABLE table_name deletes both the metadata and the data. With an unmanaged
### table, the same command will delete only the metadata, not the actual data.

### views disappear after
### your Spark application terminates.
# spark.catalog.createTable('test') #blank tbl
# spark.catalog.dropTempView('test')
### There is no Drop table method on the catalog property/class - must use sql.
# spark.sql('DROP TABLE test purge')
# df.write.saveAsTable("test", mode='overwrite')
# df.createOrReplaceTempView('temp')

In [None]:
# df.rdd.getNumPartitions()

In [None]:
# df = df.repartition(8)
# # df = df.coalesce(2)
# df.collect()
# df.rdd.getNumPartitions()


In [None]:
# df = df.withColumn('PART_ID',f.spark_partition_id())
# df.select('PART_ID').groupby('PART_ID').count().show()

In [None]:
# df.sample(True, fraction=.5).show()

In [None]:
# df.dtypes

In [None]:
# def squared(x):
#     if x == None:
#         x = 0
#         return x
#     else:
#         return x*x

# spark.udf.register('square', squared)
# spark.sql("SELECT square(`Number`) FROM temp").show()

In [None]:
# df.orderBy(f.desc_nulls_last('Number')).show()

In [None]:
### There is no takeAll - so would not cache right away
# df.take(2)

In [None]:
# df.show(truncate=False)

In [None]:
# def fx(x):
#     print(x)
# print(df.foreach(fx))

In [None]:
# df.first()

In [None]:
# df = spark.createDataFrame([("A", 20), ("B", 30), ("D", 80), ('E', None)],["Letter", "Number"])

In [None]:
# df = df.repartition(100)
# # df = df.coalesce(3)
# df.collect()
# df.rdd.getNumPartitions()

In [None]:
# df.write.csv('TEST',mode='overwrite')

In [None]:
# df = df.withColumn('Number', df['Number'].cast(types.StringType()))

In [None]:
# df.select('Number').write.text('test.txt', mode='overwrite')

In [None]:
# df.select('Number').where((f.col('Number') != 0) & (f.col('Number') != 80)).show()

In [None]:
# df = df.withColumn('Number', df['Number'].cast(types.IntegerType()))

In [None]:
# df.show()

In [None]:
# df.groupby('Letter').mean('Number').show()
# df.groupby('Letter').agg(f.mean('Number')).show()

In [None]:
# train_df.withColumn('month', f.date_format('mm',f.col('test_date'))).show()

In [None]:
# train_df.createOrReplaceTempView('train_df')

In [None]:
# def testfx(x):
#     return x*x*x

# spark.udf.register('testfx',testfx)

# spark.sql('SELECT testfx(Pclass) from train_df').show()

In [None]:
### Reduce by is better than group by key because it pre-combines rows on the exectuor.
### Groupby doesn't, it sends all data to one node THEN groups
### https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html

In [None]:
### Shuffles: https://stackoverflow.com/questions/26273664/what-are-the-spark-transformations-that-causes-a-shuffle

In [None]:
# test_df = train_df.select(f.split('test_date','-').alias('split_date'))
# test_df.show()

In [None]:
# test_df.select(f.explode('split_date')).show()

In [None]:
# test_df.select(f.posexplode('split_date')).show()

In [None]:
# test_df.select(f.collect_list('split_date')).first()

In [None]:
data = [   ("Ali", 0, [100]),   ("Barbara", 1, [300, 250, 100]),   ("Cesar", 1, [350, 100]),   ("Dongmei", 1, [400, 100]),   ("Eli", 2, [250]),   ("Florita", 2, [500, 300, 100]),   ("Gatimu", 3, [300, 100]) ]
data = [("Ali", 0, [[100]]),   ("Barbara", 1, [[100,200,300],[300, 250, 100]]),   ("Cesar", 1, [[1,2,3],[350, 100]])]
maxpeopleDF = spark.createDataFrame(data).toDF("name", "department", "score")

In [None]:
# peopleDF.show()

In [None]:
# peopleDF.withColumn('score', f.explode(f.col('score'))).show()

In [None]:
# (peopleDF
#  .withColumn('score', f.explode(f.col('score')))
#  .groupby('department')
#  .max('score').show()
#  .withColumnRenamed('max(score)','highest')
# )


In [None]:
# df.rdd.cogroup(df.rdd).collect()

In [None]:
df.sort('Letter').show()

In [None]:
df.join(df.sample(True,.25, 42), 'Letter', how='left').show()

In [None]:
# Declare the cubed function
def cubed(a: pd.Series) -> pd.Series:
    return a * a * a
# Create the pandas UDF for the cubed function
cubed_udf = f.pandas_udf(cubed, returnType=types.IntegerType())

In [None]:
maxpeopleDF.select('*').show(truncate=False)

In [None]:
maxpeopleDF.select(f.col('name'),f.flatten(f.col('score'))).show(truncate=False)

In [None]:
maxpeopleDF.show()

In [None]:
maxpeopleDF.select(f.col('name'),f.explode(f.col('score'))).show()

In [None]:
maxpeopleDF.select(f.collect_list(f.col('score'))).show(truncate=False)

In [None]:
df.groupby('Letter').pivot('Number').agg(f.sum('Number')).show()

In [None]:
# codes = spark.read.csv('airport-codes-na.txt', sep='\t', header=True, inferSchema=True)

In [None]:
# codes.show()

In [None]:
air = spark.read.csv('departuredelays.csv', sep=',', header=True, inferSchema=True)

In [None]:
air.dtypes

In [None]:
gair = air.groupby('origin')

In [None]:
air.rollup('origin','destination').sum('delay', 'distance').sort('origin').show()

In [None]:
air.cube('origin','destination').sum('delay', 'distance').sort('origin').show()

In [None]:
w = Window.partitionBy(f.col('destination')).orderBy(f.col('delay'))

In [None]:
air.select('origin','destination','delay',f.dense_rank().over(w)).show()

In [None]:
w = Window.partitionBy('distance').orderBy('delay')

In [None]:
air.select('origin', 'distance', 'delay', f.dense_rank().over(w)).orderBy('distance', ascending=False).show()

In [None]:
air.dtypes

In [None]:
train_df = train_df.repartition(4)
train_df.rdd.getNumPartitions()

In [None]:
train_df.summary().show()

In [None]:
train_df.describe().show()

In [None]:
train_df.head()

In [None]:
train_df.where((f.col('Pclass').contains('1'))).show()

In [None]:
train_df.agg(f.sum('Fare')).show()

In [None]:
train_df.select(f.sum('Fare')).show()

In [None]:
train_df.select(f.expr('SUM(Fare)')).show()

In [None]:
gb = train_df.groupby('Survived')

In [None]:
gb.

In [None]:
train_df.select(f.explode(f.split('Name', ','))).show(truncate=False)

In [None]:
train_df.select(f.collect_list(f.split('Name', ','))).show(truncate=False)

In [None]:
w = Window.partitionBy('PClass').orderBy('Survived')

In [158]:
train_df.select(f.dense_rank().over(w)).show()

+---------------------------------------------------------------------------------------------+
|DENSE_RANK() OVER (PARTITION BY PClass ORDER BY Survived ASC NULLS FIRST unspecifiedframe$())|
+---------------------------------------------------------------------------------------------+
|                                                                                            1|
|                                                                                            1|
|                                                                                            1|
|                                                                                            1|
|                                                                                            1|
|                                                                                            1|
|                                                                                            1|
|                                       

In [163]:
train_df.select(f.col('test_date').cast(types.TimestampType())).show(truncate=False)

+-------------------+
|test_date          |
+-------------------+
|2021-01-14 00:00:00|
|2021-01-14 00:00:00|
|2021-01-14 00:00:00|
|2021-01-14 00:00:00|
|2021-01-14 00:00:00|
|2021-01-14 00:00:00|
|2021-01-14 00:00:00|
|2021-01-14 00:00:00|
|2021-01-14 00:00:00|
|2021-01-14 00:00:00|
|2021-01-14 00:00:00|
|2021-01-14 00:00:00|
|2021-01-14 00:00:00|
|2021-01-14 00:00:00|
|2021-01-14 00:00:00|
|2021-01-14 00:00:00|
|2021-01-14 00:00:00|
|2021-01-14 00:00:00|
|2021-01-14 00:00:00|
|2021-01-14 00:00:00|
+-------------------+
only showing top 20 rows

