<a href="https://colab.research.google.com/github/jlvvlj/deeplearning/blob/master/PySpark_Tutorial.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [6]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.1.2.tar.gz (212.4 MB)
[K     |████████████████████████████████| 212.4 MB 63 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 22.7 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=7f49ab7cb76476ef7608d064e0ab7441528ca207c19f3524e8f23fd1f7aac67e
  Stored in directory: /root/.cache/pip/wheels/a5/0a/c1/9561f6fecb759579a7d863dcd846daaa95f598744e71b02c77
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2


In [2]:
import csv  

header = ['name', 'age']
data = [
    ['Jack', 28],
    ['Paul', 23],
    ['Cindy', 19],
]

with open('people.csv', 'w', encoding='UTF8', newline='') as f:
    writer = csv.writer(f)

    # write the header
    writer.writerow(header)

    # write the data
    writer.writerows(data)

In [3]:
import pandas as pd

In [4]:
pd.read_csv('people.csv')

Unnamed: 0,name,age
0,Jack,28
1,Paul,23
2,Cindy,19


In [7]:
from pyspark.sql import SparkSession

In [8]:
spark=SparkSession.builder.appName('Practice').getOrCreate()

In [9]:
spark

In [11]:
df_pyspark=spark.read.csv('people.csv')

In [12]:
df_pyspark.show()

+-----+---+
|  _c0|_c1|
+-----+---+
| name|age|
| Jack| 28|
| Paul| 23|
|Cindy| 19|
+-----+---+



In [13]:
df_pyspark = spark.read.option('header', 'true').csv('people.csv')

In [14]:
df_pyspark.head(3)

[Row(name='Jack', age='28'),
 Row(name='Paul', age='23'),
 Row(name='Cindy', age='19')]

In [15]:
df_pyspark.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)



In [16]:
#read the dataset
df_pyspark=spark.read.option('header', 'true').csv('people.csv', inferSchema=True)

In [17]:
df_pyspark.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)



In [18]:
df_pyspark.show()

+-----+---+
| name|age|
+-----+---+
| Jack| 28|
| Paul| 23|
|Cindy| 19|
+-----+---+



In [19]:
df_pyspark.columns

['name', 'age']

In [20]:
df_pyspark.select('Name').show()

+-----+
| Name|
+-----+
| Jack|
| Paul|
|Cindy|
+-----+



In [21]:
df_pyspark.select(['Name', 'Age']).show()

+-----+---+
| Name|Age|
+-----+---+
| Jack| 28|
| Paul| 23|
|Cindy| 19|
+-----+---+



In [22]:
df_pyspark.select('Age').dtypes

[('Age', 'int')]

In [23]:
df_pyspark.describe().show()

+-------+-----+------------------+
|summary| name|               age|
+-------+-----+------------------+
|  count|    3|                 3|
|   mean| null|23.333333333333332|
| stddev| null| 4.509249752822894|
|    min|Cindy|                19|
|    max| Paul|                28|
+-------+-----+------------------+



In [41]:
#adding a column to the dataframe

experiences = [('Jack', 15), ('Paul', 4), ('Cindy', 1)]

experiences_df = spark.createDataFrame(experiences, ['Name', 'Experience'])
data = df_pyspark.join(experiences_df, 'Name')

In [42]:
data.show()

+-----+---+----------+
| name|age|Experience|
+-----+---+----------+
| Jack| 28|        15|
| Paul| 23|         4|
|Cindy| 19|         1|
+-----+---+----------+



In [45]:
#adding a column from existing column to the dataframe
data = data.withColumn('Experience after 2 years', data['Experience'] + 2 )

In [46]:
#remove a column
data = data.drop(data['Experience'])

In [47]:
data.show()

+-----+---+------------------------+
| name|age|Experience after 2 years|
+-----+---+------------------------+
| Jack| 28|                      17|
| Paul| 23|                       6|
|Cindy| 19|                       3|
+-----+---+------------------------+



In [50]:
data = data.withColumnRenamed('Experience after 2 years', 'Work Experience')

In [51]:
data.show()

+-----+---+---------------+
| name|age|Work Experience|
+-----+---+---------------+
| Jack| 28|             17|
| Paul| 23|              6|
|Cindy| 19|              3|
+-----+---+---------------+



In [72]:
salaries = [('Jack', 50000),('Paul', 70000),('Cindy', 1000000)]
salaries_df = spark.createDataFrame(salaries, ['Name', 'Salary'])

In [60]:
salaries_df.show()

+-----+-------+
| Name| Salary|
+-----+-------+
| Jack|  50000|
| Paul|  70000|
|Cindy|1000000|
+-----+-------+



In [61]:
data = data.join(salaries_df, 'Name')

In [62]:
data.show()

+-----+---+---------------+-------+-------+
| name|age|Work Experience| Salary| Salary|
+-----+---+---------------+-------+-------+
| Jack| 28|             17|  50000|  50000|
|Cindy| 19|              3|1000000|1000000|
| Paul| 23|              6|  70000|  70000|
+-----+---+---------------+-------+-------+



In [65]:
##drop na values
data.na.drop().show()

+-----+---+---------------+
| name|age|Work Experience|
+-----+---+---------------+
| Jack| 28|             17|
|Cindy| 19|              3|
| Paul| 23|              6|
+-----+---+---------------+



In [66]:
##all or any
data.na.drop(how="any").show()

+-----+---+---------------+
| name|age|Work Experience|
+-----+---+---------------+
| Jack| 28|             17|
|Cindy| 19|              3|
| Paul| 23|              6|
+-----+---+---------------+



In [67]:
##threshold
data.na.drop(how="any",thresh=3).show()

+-----+---+---------------+
| name|age|Work Experience|
+-----+---+---------------+
| Jack| 28|             17|
|Cindy| 19|              3|
| Paul| 23|              6|
+-----+---+---------------+



In [68]:
##Subset
data.na.drop(how="any",subset=['Age']).show()

+-----+---+---------------+
| name|age|Work Experience|
+-----+---+---------------+
| Jack| 28|             17|
|Cindy| 19|              3|
| Paul| 23|              6|
+-----+---+---------------+



In [70]:
## Filling the Missing Value
data.na.fill('Missing Values',['Work Experience','age']).show()

+-----+---+---------------+
| name|age|Work Experience|
+-----+---+---------------+
| Jack| 28|             17|
|Cindy| 19|              3|
| Paul| 23|              6|
+-----+---+---------------+



In [6]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.1.2.tar.gz (212.4 MB)
[K     |████████████████████████████████| 212.4 MB 62 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 52.6 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=cd94bdf38e51c4b6403c6e877ecfc675ab91ce74c168e5a53e38aac9e2f701b8
  Stored in directory: /root/.cache/pip/wheels/a5/0a/c1/9561f6fecb759579a7d863dcd846daaa95f598744e71b02c77
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2


In [7]:
import csv  

header = ['name', 'size']
data = [
    ['Jack', 158],
    ['Paul', 123],
    ['Cindy', 191],
    ['Mary']
]

with open('heights.csv', 'w', encoding='UTF8', newline='') as f:
    writer = csv.writer(f)

    # write the header
    writer.writerow(header)

    # write the data
    writer.writerows(data)

In [8]:
from pyspark.sql import SparkSession

In [9]:
spark = SparkSession.builder.appName('Heights').getOrCreate()

In [10]:
spark

In [18]:
heights_df = spark.read.csv('heights.csv',  header=True, inferSchema=True)

In [23]:
data = heights_df

In [24]:
#data = heights_df.na.drop(how='any')

In [25]:
data.show()

+-----+----+
| name|size|
+-----+----+
| Jack| 158|
| Paul| 123|
|Cindy| 191|
| Mary|null|
+-----+----+



In [26]:
#replace missing values with calculated value ( here mean of size is used)

from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=['size'],
    outputCols=["{}_imputed".format(c) for c in ['size']],
).setStrategy('mean')

In [27]:
imputer.fit(data).transform(data).show()

+-----+----+------------+
| name|size|size_imputed|
+-----+----+------------+
| Jack| 158|         158|
| Paul| 123|         123|
|Cindy| 191|         191|
| Mary|null|         157|
+-----+----+------------+

