# Get Started with PySpark

## Packages

In [1]:
from pyspark.sql import SparkSession

## Paths & Config

In [2]:
DATA = 'data/iris.csv'

## Initialization

In [3]:
spark = SparkSession.builder.getOrCreate()

## Read data

In [4]:
spark_df = spark.read\
                .option('header', True)\
                .csv(DATA, sep=',')
spark_df.show(10)

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| setosa|
|         4.9|        3.0|         1.4|        0.2| setosa|
|         4.7|        3.2|         1.3|        0.2| setosa|
|         4.6|        3.1|         1.5|        0.2| setosa|
|         5.0|        3.6|         1.4|        0.2| setosa|
|         5.4|        3.9|         1.7|        0.4| setosa|
|         4.6|        3.4|         1.4|        0.3| setosa|
|         5.0|        3.4|         1.5|        0.2| setosa|
|         4.4|        2.9|         1.4|        0.2| setosa|
|         4.9|        3.1|         1.5|        0.1| setosa|
+------------+-----------+------------+-----------+-------+
only showing top 10 rows



## Data Exploration

In [9]:
spark_df.dtypes

[('sepal_length', 'string'),
 ('sepal_width', 'string'),
 ('petal_length', 'string'),
 ('petal_width', 'string'),
 ('species', 'string')]

In [10]:
spark_df.printSchema()

root
 |-- sepal_length: string (nullable = true)
 |-- sepal_width: string (nullable = true)
 |-- petal_length: string (nullable = true)
 |-- petal_width: string (nullable = true)
 |-- species: string (nullable = true)



## Split data

In [5]:
train, test = spark_df.randomSplit(weights=(0.8, 0.2))

In [6]:
train.show()

+------------+-----------+------------+-----------+----------+
|sepal_length|sepal_width|petal_length|petal_width|   species|
+------------+-----------+------------+-----------+----------+
|         4.3|        3.0|         1.1|        0.1|    setosa|
|         4.4|        3.0|         1.3|        0.2|    setosa|
|         4.5|        2.3|         1.3|        0.3|    setosa|
|         4.6|        3.1|         1.5|        0.2|    setosa|
|         4.6|        3.2|         1.4|        0.2|    setosa|
|         4.6|        3.4|         1.4|        0.3|    setosa|
|         4.6|        3.6|         1.0|        0.2|    setosa|
|         4.7|        3.2|         1.3|        0.2|    setosa|
|         4.8|        3.0|         1.4|        0.1|    setosa|
|         4.8|        3.0|         1.4|        0.3|    setosa|
|         4.8|        3.4|         1.6|        0.2|    setosa|
|         4.8|        3.4|         1.9|        0.2|    setosa|
|         4.9|        2.4|         3.3|        1.0|vers

## Transform data

### Normalization

In [7]:
from pyspark.ml.feature import StandardScaler

In [8]:
features = ["sepal_length", "sepal_width", "petal_length", "petal_width"]
for feature in features:
    scaler = StandardScaler(inputCol=feature, outputCol=feature + "_scaled") 
    scaled = scaler.fit(spark_df).transform(spark_df)
    scaled = scaled.drop(feature)
scaled.show()

IllegalArgumentException: requirement failed: Column sepal_length must be of type class org.apache.spark.ml.linalg.VectorUDT:struct<type:tinyint,size:int,indices:array<int>,values:array<double>> but was actually class org.apache.spark.sql.types.StringType$:string.

In [12]:
scaler = StandardScaler(inputCol=("sepal_length", "sepal_width", "petal_length", "petal_width"), 
                        outputCol=("sepal_length_scaled", "sepal_width_scaled", "petal_length_scaled", "petal_width_scaled")) 
scaled = scaler.fit(spark_df).transform(spark_df)
scaled = scaled.drop(*("sepal_length", "sepal_width", "petal_length", "petal_width"))
scaled.show()

TypeError: Invalid param value given for param "inputCol". Could not convert <class 'tuple'> to string type

### Label Encoding

In [7]:
from pyspark.ml.feature import StringIndexer

In [9]:
encoder = StringIndexer(inputCol="species", outputCol="speciesIndex") 
indexed = encoder.fit(spark_df).transform(spark_df)
indexed = indexed.drop("species")
indexed.show()

+------------+-----------+------------+-----------+------------+
|sepal_length|sepal_width|petal_length|petal_width|speciesIndex|
+------------+-----------+------------+-----------+------------+
|         5.1|        3.5|         1.4|        0.2|         0.0|
|         4.9|        3.0|         1.4|        0.2|         0.0|
|         4.7|        3.2|         1.3|        0.2|         0.0|
|         4.6|        3.1|         1.5|        0.2|         0.0|
|         5.0|        3.6|         1.4|        0.2|         0.0|
|         5.4|        3.9|         1.7|        0.4|         0.0|
|         4.6|        3.4|         1.4|        0.3|         0.0|
|         5.0|        3.4|         1.5|        0.2|         0.0|
|         4.4|        2.9|         1.4|        0.2|         0.0|
|         4.9|        3.1|         1.5|        0.1|         0.0|
|         5.4|        3.7|         1.5|        0.2|         0.0|
|         4.8|        3.4|         1.6|        0.2|         0.0|
|         4.8|        3.0

In [None]:
# list  of students  data 
data =[["1","sravan","vignan"],
       ["2","ojaswi","vvit"],
       ["3","rohith","vvit"],
       ["4","sridevi","vignan"],
       ["1","sravan","vignan"], 
       ["5","gnanesh","iit"]]
  
# specify column names
columns=['student ID','student NAME','college']
  
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data,columns)
  
print("Actual data in dataframe")
  
# show dataframe
dataframe.show()

## References
* [Install PySpark in Anaconda & Jupyter Notebook](https://sparkbyexamples.com/pyspark/install-pyspark-in-anaconda-jupyter-notebook/)
* https://www.geeksforgeeks.org/how-to-delete-columns-in-pyspark-dataframe/