In [7]:
from pyspark.sql import SparkSession
spark=SparkSession.builder \
        .appName("demo") \
        .master('local[*]') \
        .getOrCreate()

In [6]:
spark_df=spark.read.csv('data.csv', header=True, inferSchema=True)
spark

# basics
- pyspark dataframe
- reading it
- checking schema
- selecting and indexing
- describing it similar to pandas
- adding, renaming and dropping columns

In [None]:
import pandas as pd
pd.read_csv('./data.csv')

Unnamed: 0,name,age
0,krish,31
1,sudansh,30
2,sunny,29


In [None]:
spark_df.head(10)
spark_df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)



In [None]:
spark_df.show()

+-------+---+
|   name|age|
+-------+---+
|  krish| 31|
|sudansh| 30|
|  sunny| 29|
+-------+---+



In [None]:
spark_df.select(['name', 'age']).show()

+-------+---+
|   name|age|
+-------+---+
|  krish| 31|
|sudansh| 30|
|  sunny| 29|
+-------+---+



In [None]:
type(spark_df['name'])

pyspark.sql.column.Column

In [None]:
spark_df.dtypes

[('name', 'string'), ('age', 'int')]

In [None]:
spark_df.describe().show()

+-------+-----+----+
|summary| name| age|
+-------+-----+----+
|  count|    3|   3|
|   mean| null|30.0|
| stddev| null| 1.0|
|    min|krish|  29|
|    max|sunny|  31|
+-------+-----+----+



In [None]:
spark_df=spark_df.withColumn('Age after 2 years', spark_df['age']+2)
spark_df=spark_df.drop('Age after 2 years')

In [None]:
spark_df.withColumnRenamed('name', 'new name').show()

+--------+---+
|new name|age|
+--------+---+
|   krish| 31|
| sudansh| 30|
|   sunny| 29|
+--------+---+



# handling missing values
- dropping columns and rows
- handling missing values by mean, median and mode

In [None]:
spark_df.show()

+-------+----+
|   name| age|
+-------+----+
|  krish|  31|
|sudansh|  30|
|  sunny|  29|
|   paul|  29|
| harsha|  21|
|   null|  34|
|   null|  30|
| mahesh|null|
+-------+----+



In [None]:
spark_df.na.drop().show()

+-------+---+
|   name|age|
+-------+---+
|  krish| 31|
|sudansh| 30|
|  sunny| 29|
|   paul| 29|
| harsha| 21|
+-------+---+



In [None]:
spark_df.na.drop(how='all').show()

+-------+----+
|   name| age|
+-------+----+
|  krish|  31|
|sudansh|  30|
|  sunny|  29|
|   paul|  29|
| harsha|  21|
|   null|  34|
|   null|  30|
| mahesh|null|
+-------+----+



In [None]:
from pyspark.sql.functions import floor
spark_df=spark_df.withColumn('fake xp', floor(spark_df['age']/2))

In [None]:
spark_df.show()

+-------+----+-------+
|   name| age|fake xp|
+-------+----+-------+
|  krish|  31|     15|
|sudansh|  30|     15|
|  sunny|  29|     14|
|   paul|  29|     14|
| harsha|  21|     10|
|   null|  34|     17|
|   null|  30|     15|
| mahesh|null|   null|
+-------+----+-------+



In [None]:
spark_df.na.drop(how='any', thresh=2).show() #if there's at least 2 non-null values (threash)

+-------+---+-------+
|   name|age|fake xp|
+-------+---+-------+
|  krish| 31|     15|
|sudansh| 30|     15|
|  sunny| 29|     14|
|   paul| 29|     14|
| harsha| 21|     10|
|   null| 34|     17|
|   null| 30|     15|
+-------+---+-------+



In [None]:
spark_df.na.drop(how='any', subset=['name']).show()

+-------+----+-------+
|   name| age|fake xp|
+-------+----+-------+
|  krish|  31|     15|
|sudansh|  30|     15|
|  sunny|  29|     14|
|   paul|  29|     14|
| harsha|  21|     10|
| mahesh|null|   null|
+-------+----+-------+



In [None]:
spark_df.na.fill('missing', ['age']).show() #?

+-------+----+-------+
|   name| age|fake xp|
+-------+----+-------+
|  krish|  31|     15|
|sudansh|  30|     15|
|  sunny|  29|     14|
|   paul|  29|     14|
| harsha|  21|     10|
|   null|  34|     17|
|   null|  30|     15|
| mahesh|null|   null|
+-------+----+-------+



In [None]:
from pyspark.ml.feature import Imputer

imp=Imputer(
    inputCols=['age', 'fake xp'],
    outputCols=[f'{col}_imputed' for col in ['age', 'fake xp']]
).setStrategy('mean') #mean, median, mode

In [None]:
spark_df.describe().show()

+-------+------+------------------+------------------+
|summary|  name|               age|           fake xp|
+-------+------+------------------+------------------+
|  count|     6|                 7|                 7|
|   mean|  null|29.142857142857142|14.285714285714286|
| stddev|  null|3.9761191895520196| 2.138089935299395|
|    min|harsha|                21|                10|
|    max| sunny|                34|                17|
+-------+------+------------------+------------------+



In [None]:
imp.fit(spark_df).transform(spark_df).show()

+-------+----+-------+-----------+---------------+
|   name| age|fake xp|age_imputed|fake xp_imputed|
+-------+----+-------+-----------+---------------+
|  krish|  31|     15|         31|             15|
|sudansh|  30|     15|         30|             15|
|  sunny|  29|     14|         29|             14|
|   paul|  29|     14|         29|             14|
| harsha|  21|     10|         21|             10|
|   null|  34|     17|         34|             17|
|   null|  30|     15|         30|             15|
| mahesh|null|   null|         29|             14|
+-------+----+-------+-----------+---------------+



# dataframes filter ops
- filter
- `&`, `|`, `==` e `~`

In [None]:
names=[row.name for row in spark_df.select('name').collect()]

import random
a=spark.createDataFrame(
    [(random.randint(2000, 10000), name) for name in names], 'salary int, name string')

#add fake salary
spark_df=spark_df.join(a, on='name', how='full')

In [None]:
spark_df.orderBy('salary').show()

+-------+----+------+
|   name| age|salary|
+-------+----+------+
|   NULL|  34|  NULL|
|   NULL|  30|  NULL|
| mahesh|NULL|  2502|
|   NULL|NULL|  4602|
|  sunny|  29|  4808|
|sudansh|  30|  4964|
|   paul|  29|  6264|
|   NULL|NULL|  6573|
|  krish|  31|  9580|
| harsha|  21|  9987|
+-------+----+------+



In [None]:
spark_df.filter('salary<=3000').select(['name', 'age']).show()

+------+----+
|  name| age|
+------+----+
|mahesh|NULL|
+------+----+



In [None]:
spark_df.filter(
    (spark_df['salary']>=4000) & (spark_df['salary']<=7000)).select(['name', 'age']).show()

+-------+----+
|   name| age|
+-------+----+
|sudansh|  30|
|  sunny|  29|
|   paul|  29|
|   NULL|NULL|
|   NULL|NULL|
+-------+----+



In [None]:
spark_df.filter(~(spark_df['salary']>=5000)).select(['name', 'age']).show() #not

+-------+----+
|   name| age|
+-------+----+
|sudansh|  30|
|  sunny|  29|
|   NULL|NULL|
| mahesh|NULL|
+-------+----+



# Group by and Aggregated Functions

In [None]:
spark_df.groupBy('age').sum('salary').show()
                    #  .count()
                    #  .avg()
                    #  .max()
                    #  etc

+----+-----------+
| age|sum(salary)|
+----+-----------+
|  31|       9580|
|  34|       NULL|
|NULL|      13677|
|  29|      11072|
|  21|       9987|
|  30|       4964|
+----+-----------+

