## Data Wrangling with PySpark for Data Scientists Who Know Pandas
[Data Wrangling with PySpark for Data Scientists Who Know Pandas - Andrew Ray](https://www.youtube.com/watch?v=XrpSRCwISdk&t=10s&ab_channel=Databricks)

and [Azure Databricks using Python with PySpark](https://www.youtube.com/watch?v=qYis56u8w4U&list=PLyvglJJUQCYE_M4W65MxNynWVm73SH9R4&index=8&t=1725s&ab_channel=BryanCafferky)

## What do I get with PySpark?
Apache Spark is a fast and general engine for large-scale data processing.

YOU DON'T BRING THE DATA TO THE PROGRAM, YOU BRING THE PROGRAM TO THE DATA

Gain
- Work with big data
- Native SQL
- Decent documentation

Lose
- Amazing documentation
- Easy plotting
- Indices

### Primer
Distributed compute
- YARN, Mesos, Standalone cluster
Abstractions
- RDD -- distributed colletion of objects
- Dataframe -- distributed dataset of tabular data
    - Integrated SQL
    - ML algorithms

### Important concepts
Immutable
- Changes create new object references
- Old versions are unchanged

Lazy
- Compute does not happen until output is requested

### Cluster Architecture
- Driver runs the users main function and executes the various parallel oerations on the worker nodes
- The results od the operations are collected by the driver
- The worker nodes read and write data from/to Data Sources including HDFS, SQL
- Worker node also cache transformed data in memory as RDDs (Resilient Data Sets)
- Worker nodes and Driver Node execute as VMs in public clouds (AWS, Google and Azure)
- Nodes run JVM

### Loading files

In [None]:
# Pandas
df = pd.read_csv("caminho")

# PySpark
df = spark.read.options(header=True, inferSchema=True).csv("caminho")
#or
df = spark.read.csv("caminho", header=True, sep=',', inferSchema=True)
#or
df = spark.read.format("parquet").load("caminho")

### Writing files

In [None]:
# Spark
df.write.format("parquet").save("caminho")
df.write.parquet("caminho")

### View Dataframe

In [None]:
# Pandas
df
df.head(10)

# PySpark
df.show()
df.show(10)

### Columns and data types

In [None]:
# Pandas and Pyspark

df.columns
df.dtypes

### Rename columns

In [None]:
# Pandas
df.columns = ['a', 'b', 'c']
df.rename(columns={'old': 'new'})

#PySpark
df.toDF('a', 'b', 'c')
df.withColumnRenamed('old', 'new')

#or
df1 = df.selectExpr("a", 
                    "b as new_b",
                    "c as new_c")
#or
df = df.select(df['c1'].alias('a'),
              df['c2'].alias('b'))

### Drop column

In [None]:
# Pandas
df.drop('mpg', axis=1)

#PySpark
df.drop('mpg')

### Filtering

In [None]:
# Pandas and PySpark

df[df.mpg < 30]
df[(df.mpg < 30) & (df.cyl > 6)]

### Add Column

In [None]:
# Pandas 1/0 = inf
df['gpm'] = 1 / df.mpg

#PySpark 1/0 = null
df.withColumn('gpm', 1 / df.mpg)

### Fill nulls

In [None]:
# Pandas
df.fillna(0) # <-- Many more options

#PySpark
df.fillna(0)

### Agregation

In [None]:
# Pandas and PySpark
df.groupby(['cyl', 'gear']).agg({'mpg': mean, 'disp': min})

### Value Counts

In [None]:
# Pandas
df['col'].value_counts()

#PySpark
df.groupBy(df['col']).count().orderBy('count', ascending=False)

### Standard Transformations

In [None]:
# Pandas
import numpy as np
df['logdisp'] = np.log(df.disp)

#PySpark
import pyspark.sql.functions as F
df.withColumn('logdisp', F.log(df.disp))

### Row conditional statements

In [None]:
# Pandas
df['cond'] = df.apply(lambda r: 1 if r.mpg > 20 else 2 if r.cyl == 6 else 3, axis=1)

# PySpark
df.withColumn('cond', F.when(df.mpg > 20, 1).when(df.cyl == 6, 2).otherwise(3))

### Types

In [None]:
# Pandas
df.dtypes

# PySpark
df.schema
df.printSchema()

# you can change type with .astype()
df = df.withColumn('Age2', df['Age'].astype("float"))
df = df.drop('Age')
df.schema

### Python when required
#### Pushing work to the nodes with User Defined Functions (UDFs)

In [None]:
# Pandas
df['disp1'] = df.disp.apply(lambda x: x+1)

# PySpark
import pyspark.sql.functions as F
from pyspark.sql.types import DoubleType

fn = F.udf(lambda x: x+1, DoubleType()) #udf user-defined function with he type
df.withColumn('disp1', fn(df.disp))

#or
from pyspark.sql.types import StringType
from pysparl.sql.functions import udf

yearssinceminor = udf(lambda age: age - 16)
new_df = df.withColumn("yearssinceminor", yearssinceminor(df.age))

### Merge/Join dataframes

In [None]:
# Pandas
left.merge(right, on='key')
left.merg(right, left_on='a', right_on='b')

#if you want to join and your left and right keys has a little bit different name
# PySpark
left.join(right, on='key')
left.join(right, left.a == right.b)

### Pivot table

In [None]:
# Pandas
pd.pivot_table(df, values='D', index=['A', 'B'], columns=['C'], aggfunc=np.sum)

# PySpark
pd.groubBy('A', 'B').pivot('C').sum('D')

### Summary statistics

In [None]:
# Pandas
df.describe()

# PySpark
df.describe().show() #(only count, mean, stddev, min, max)
# quartiles
df.selectExpr(
        "percentile_approx(mpg, array(.25, .5, .75)) as mpg").show()

In [None]:
from pyspark.sql.functions import mean, min, max
df.select([mean('age'), min('age'), max('age')]).show()

### Histrogram

In [None]:
# Pandas 
df.hist()

#PySpark
df.sample(False, 0.1).toPandas().hist()

### SQL

#### Create a temporary table from our spark dataframe

In [None]:
# PySpark 
df.createOrReplaceTempView('foo')
df2 =  spark.sql('select * from foo')

#### Saving a dataframe to a permanet managed table

In [None]:
df.write.saveAsTable("foo2")

In [None]:
%sql
select one, two from foo sort by three

### Use SQL to create a python spark dataframe

In [None]:
df = spark.sql(" select * from diabetes where diabetes = 1")

## Make sure to
- Use pyspark.sql.functions and other built in funcions
- Use the same version of python and packages on cluster as driver
- Check out the UI at http://localhost:4040/
- Learn about SSH por forwarding
- Check out Spark MLlib
- RTFM: https://spark.apache.org/docs/latest

## Things not to do
- Try to iterate through rows
- Hard code a master in your driver
    - Use spark-submit for that
- df.toPandas().head()
    - instead do: df.limit(5).toPandas()

## If things go wrong
- Google it
- Search/Ask stack Overflow (tag apache-spark)