<a href="https://colab.research.google.com/github/pksX01/Python-Tutorials/blob/master/rdd_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Transformations : ** map, flatMap, filter, groupBy, groupByKey, reduceByKey, union, sortByKey, join, coealesce, repartition, mapValues, aggregateByKey etc. </br>
**Actions : ** first, take, collect, reduce, countByKey, saveAsTextFile, count, foreach etc.

In [None]:
import pyspark

In [None]:
from pyspark import SparkContext

In [None]:
sc = SparkContext()

In [None]:
sc

**Reading a file**

In [None]:
rdd_text = sc.textFile('/FileStore/tables/healthcare_dataset_stroke_data.csv')
rdd_text

In [None]:
rdd_text.count()

In [None]:
rdd_text.first()

In [None]:
rdd_text.take(5)

In [None]:
for line in rdd_text.take(5):
  print(line)

**Removing duplicates records**

In [None]:
rdd_text_unique = rdd_text.distinct()

In [None]:
rdd_text_unique.count()

**Filtering only data where stroke = '1' and saving that data as CSV**

In [None]:
positive_stroke_data = rdd_text.map(lambda line: line.split(',')).filter(lambda x: x[-1]=='1')

In [None]:
positive_stroke_data.count()

In [None]:
positive_stroke_data.first()

In [None]:
#retreiving header
header = sc.parallelize([rdd_text.map(lambda line: line.split()).first()])

In [None]:
header.collect()

In [None]:
#combining header and data having stroke = '1'
positive_stroke_data_with_header = header.union(positive_stroke_data)

In [None]:
positive_stroke_data_with_header.count()

In [None]:
positive_stroke_data_with_header.take(2)

In [None]:
#Saving as csv file to Databricks File System (DBFS)
positive_stroke_data_with_header.saveAsTextFile('/FileStore/savedData/positive_stroke_data.csv')

In [None]:
from pyspark import SQLContext
sqlCon = SQLContext(sc)

In [None]:
sqlCon.read.csv('/FileStore/savedData/positive_stroke_data.csv').tail(1)

**Finding different values of Residence_type and their count in original data**

In [None]:
def split_lines(line):
  return line.split(',')

In [None]:
rdd_text.map(split_lines).map(lambda values: (values[7], 1)).reduceByKey(lambda a,b : a+b).collect()

**Find out the number of persons having stroke for each gender**

In [None]:
#1st method (reduceByKey() and sortByKey() are transformations, hence collect() is required to get output)
rdd_text.map(split_lines).map(lambda x: ((x[1], x[-1]), 1)).reduceByKey(lambda a,b : a+b).sortByKey().collect()

In [None]:
#2nd method (countByKey() is action, hence collect() will not be used but sortByKey() is a transformation. hence it can't be used after countByKey()). It returns hash map.
rdd_text.map(split_lines).map(lambda x: ((x[1], x[-1]), 1)).sortByKey().countByKey()

**Grouping data on the basis of gender**</br>
groupBy generates data in the form of key and value pair. Key will be the value on which basis grouping is being done and whole data related to that key will be in value of tuple. To retrive the values, they need to be converted into list.

In [None]:
rdd_grouped = rdd_text.map(lambda line: line.split(',')).groupBy(lambda x: x[1])

In [None]:
rdd_grouped.take(2)

In [None]:
print([(key, list(val)) for (key, val) in rdd_grouped.take(4)])

In [None]:
for key, val in rdd_grouped.collect():
  print((key, list(val)))

**Finding no of persons of each gender**</br>
groupByKey generates data in the form of key and value pair. Key will be the value on which basis grouping is being done and whole data related to that key will be in value of tuple. To retrive the values, they need to be converted into list.

In [None]:
rdd_groupedByKey = rdd_text.map(lambda line: line.split(',')).map(lambda x: (x[1],1)).groupByKey()

In [None]:
rdd_groupedByKey.take(2)

In [None]:
print([x[0], list(x[1])] for x in rdd_groupedByKey.collect())

In [None]:
for x in rdd_groupedByKey.collect():
  print((x[0], list(x[1])))

In [None]:
rdd_groupedByKey.mapValues(sum).sortByKey().collect()

**map vs flatMap**

In [None]:
rdd_text.flatMap(split_lines).take(20)

In [None]:
rdd_text.map(split_lines).take(2)