In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import *
from pyspark.sql.types import DoubleType,IntegerType
import pyspark.sql.functions as F

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()

In [27]:
rdd= sc.textFile('WorldCups.csv')
rdd.take(2)

['Year,Country,Winner,Runners-Up,Third,Fourth,GoalsScored,QualifiedTeams,MatchesPlayed,Attendance',
 '1930,Uruguay,Uruguay,Argentina,USA,Yugoslavia,70,13,18,590.549']

In [28]:
header=rdd.first()
final_rdd = rdd.filter(lambda x: x!=header)
rdd = rdd.map(lambda line: (line.split(",")))

In [29]:
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType, IntegerType

schema = StructType([StructField('Year', StringType(), True),
                         StructField('Country', StringType(), True),
                         StructField('Winner', StringType(), True),
                         StructField('Runners-Up', StringType(), True),
                         StructField('Third', StringType(), True),
                         StructField('Fourth', StringType(), True),
                         StructField('GoalsScored', StringType(), True),
                         StructField('QualifiedTeams', StringType(), True),
                         StructField('MatchesPlayed', StringType(), True),
                         StructField('Attendance', StringType(), True)])

df=spark.createDataFrame(rdd,schema=schema)
df = df.withColumn('GoalsScored', df['GoalsScored'].cast(IntegerType()))
df = df.withColumnRenamed('Runners-Up', 'Runnersup')

In [31]:
## 1 For each Stadium finding out the no of goals - RDD
final_rdd.filter(lambda x:x.split(',')[6]!='null')\
.map(lambda x:(x.split(',')[1],x.split(',')[6]))\
.reduceByKey(lambda x,y:(x+y))\
.sortByKey(ascending=False).collect()

## Using RDD 

# -> As it treats everything as lines, so we have separated the fields with comma delimeter
# -> Created a Map function to create array of string which has the Country and No of Goals for each
# -> Used reducebyKey to combine all the values for a key, used Sum
# -> Sortkey by key values and did collect operation on the rdd to display the values

[('Uruguay', '70'),
 ('USA', '141'),
 ('Switzerland', '140'),
 ('Sweden', '126'),
 ('Spain', '146'),
 ('South Africa', '145'),
 ('Mexico', '95132'),
 ('Korea/Japan', '161'),
 ('Italy', '70115'),
 ('Germany', '97147'),
 ('France', '84171'),
 ('England', '89'),
 ('Chile', '89'),
 ('Brazil', '88171'),
 ('Argentina', '102')]

In [49]:
## For each Stadium finding out the no of goals - DF
df.select(['Country','GoalsScored'])\
.filter(df.GoalsScored!=0)\
.groupBy(df.Country).sum()\
.sort(df.Country,ascending=False).show()

## Using DF 

# -> Once after converting to the DF using the Schema method
# -> Now selected the necessary columns
# -> Filtered the columns where goals is not zero
# -> Sortkey by country and did Show operation on the DF to display the values

+------------+----------------+
|     Country|sum(GoalsScored)|
+------------+----------------+
|     Uruguay|              70|
|         USA|             141|
| Switzerland|             140|
|      Sweden|             126|
|       Spain|             146|
|South Africa|             145|
|      Mexico|             227|
| Korea/Japan|             161|
|       Italy|             185|
|     Germany|             244|
|      France|             255|
|     England|              89|
|       Chile|              89|
|      Brazil|             259|
|   Argentina|             102|
+------------+----------------+



In [56]:
# 2 Filter USA Countries - RDD
final_rdd.filter(lambda x:x.split(',')[1]=='USA')\
.collect()

## Using RDD 

# -> As it treats everything as lines, so we have separated the fields with comma delimeter
# -> Created a FIlter function for filtering the records where the Country is USA
# -> Did collect operation on the rdd to display the values

['1994,USA,Brazil,Italy,Sweden,Bulgaria,141,24,52,3.587.538']

In [58]:
# Filter USA Countries - DF
df.filter(df.Country=='USA').show()

## Using DF 

# -> Once after converting to the DF using the Schema method
# -> Now  Filtered the columns where COuntry is not USA
# -> Did Show operation on the DF to display the values

+----+-------+------+---------+------+--------+-----------+--------------+-------------+----------+
|Year|Country|Winner|Runnersup| Third|  Fourth|GoalsScored|QualifiedTeams|MatchesPlayed|Attendance|
+----+-------+------+---------+------+--------+-----------+--------------+-------------+----------+
|1994|    USA|Brazil|    Italy|Sweden|Bulgaria|        141|            24|           52| 3.587.538|
+----+-------+------+---------+------+--------+-----------+--------------+-------------+----------+



In [79]:
# 3 Displaying the distinct Winners - RDD
final_rdd.filter(lambda x:x.split(',')[2]!='null')\
.map(lambda x:x.split(',')[2]).distinct()\
.takeOrdered( 10,key = lambda x: x[0])

## Using RDD 

# -> As it treats everything as lines, so we have separated the fields with comma delimeter
# -> Created a Filter function for filtering the records where the 2nd Index is not NULL
# -> Created a Map Function to get the exact 2 values info and performed Distinct transformation on it 
# -> Did takeOrdered operation on the rdd to display the values in a sorted ordered

['Argentina',
 'Brazil',
 'England',
 'France',
 'Germany FR',
 'Germany',
 'Italy',
 'Spain',
 'Uruguay']

In [82]:
# Displaying the distinct Winners - DF
df.select(df.Winner).distinct().sort(df.Winner).show(9)

## Using DF 

# -> Once after converting to the DF using the Schema method
# -> Now selected the Winner column
# -> Used Distinct transformation operation and sorted on the Winner column

+----------+
|    Winner|
+----------+
| Argentina|
|    Brazil|
|   England|
|    France|
|   Germany|
|Germany FR|
|     Italy|
|     Spain|
|   Uruguay|
+----------+
only showing top 9 rows



In [93]:
# 4 Displaying the records where Country and Winner is same - RDD
final_rdd.map(lambda x:x.split(','))\
.filter(lambda x :x[1]==x[2]).collect()


## Using RDD 

# -> As it treats everything as lines, so we have separated the fields with comma delimeter
# -> Created a Map Function to split the lines at the Comma separeted
# -> Used a filter for fetching the records where the 1st and 2nd index value are same
# -> Did collect action operation on the rdd to display the values in a sorted ordered

[['1930',
  'Uruguay',
  'Uruguay',
  'Argentina',
  'USA',
  'Yugoslavia',
  '70',
  '13',
  '18',
  '590.549'],
 ['1934',
  'Italy',
  'Italy',
  'Czechoslovakia',
  'Germany',
  'Austria',
  '70',
  '16',
  '17',
  '363.000'],
 ['1966',
  'England',
  'England',
  'Germany FR',
  'Portugal',
  'Soviet Union',
  '89',
  '16',
  '32',
  '1.563.135'],
 ['1978',
  'Argentina',
  'Argentina',
  'Netherlands',
  'Brazil',
  'Italy',
  '102',
  '16',
  '38',
  '1.545.791'],
 ['1998',
  'France',
  'France',
  'Brazil',
  'Croatia',
  'Netherlands',
  '171',
  '32',
  '64',
  '2.785.100']]

In [96]:
# Displaying the records where Country and Winner is same - RDD
df.filter(df.Country==df.Winner)\
.show()

## Using DF 

# -> Once after converting to the DF using the Schema method
# -> Used filter operation on the Dataframe for fetching the records where Country and Winner is same
# -> Used Show operation for displaying the records

+----+---------+---------+--------------+--------+------------+-----------+--------------+-------------+----------+
|Year|  Country|   Winner|     Runnersup|   Third|      Fourth|GoalsScored|QualifiedTeams|MatchesPlayed|Attendance|
+----+---------+---------+--------------+--------+------------+-----------+--------------+-------------+----------+
|1930|  Uruguay|  Uruguay|     Argentina|     USA|  Yugoslavia|         70|            13|           18|   590.549|
|1934|    Italy|    Italy|Czechoslovakia| Germany|     Austria|         70|            16|           17|   363.000|
|1966|  England|  England|    Germany FR|Portugal|Soviet Union|         89|            16|           32| 1.563.135|
|1978|Argentina|Argentina|   Netherlands|  Brazil|       Italy|        102|            16|           38| 1.545.791|
|1998|   France|   France|        Brazil| Croatia| Netherlands|        171|            32|           64| 2.785.100|
+----+---------+---------+--------------+--------+------------+---------

In [112]:
# 5 Total People Attended in Each Year
final_rdd.map(lambda x:(x.split(',')[0],x.split(',')[8]))\
.reduceByKey(lambda x,y:x+y).sortByKey().collect()

## Using RDD 

# -> As it treats everything as lines, so we have separated the fields with comma delimeter
# -> Created a Map Function to split the lines at the Comma separeted and taken only 0th and 8th index values
# -> Used reduceByKey transformation on it for summing the records
# -> Did and sortByKey and collect action operation on the rdd to display the values in a sorted ordered

[('1930', '18'),
 ('1934', '17'),
 ('1938', '18'),
 ('1950', '22'),
 ('1954', '26'),
 ('1958', '35'),
 ('1962', '32'),
 ('1966', '32'),
 ('1970', '32'),
 ('1974', '38'),
 ('1978', '38'),
 ('1982', '52'),
 ('1986', '52'),
 ('1990', '52'),
 ('1994', '52'),
 ('1998', '64'),
 ('2002', '64'),
 ('2006', '64'),
 ('2010', '64'),
 ('2014', '64')]

In [114]:
# Total People Attended in Each Year
df = df.withColumn('MatchesPlayed', df['MatchesPlayed'].cast(IntegerType()))
df.select(df.Year,df.MatchesPlayed)\
.filter(df.Year!='Year')\
.groupBy(df.Year).sum()\
.sort(df.Year).show()

## Using DF 

# -> Once after converting to the DF using the Schema method
# -> Converted the MatchesPlayed values to the Interger Type 
# -> Used Select for getting the necessary values 
# -> filter operation is used on the Dataframe for fetching the records where year doesn't has year
# -> Used sort and Show operation for displaying the records

+----+------------------+
|Year|sum(MatchesPlayed)|
+----+------------------+
|1930|                18|
|1934|                17|
|1938|                18|
|1950|                22|
|1954|                26|
|1958|                35|
|1962|                32|
|1966|                32|
|1970|                32|
|1974|                38|
|1978|                38|
|1982|                52|
|1986|                52|
|1990|                52|
|1994|                52|
|1998|                64|
|2002|                64|
|2006|                64|
|2010|                64|
|2014|                64|
+----+------------------+



In [None]:
## RDD VS DataFrame 


# Type Safe
# RDD is Type safe whereas the Data frames are not Type Safe. When converting to the DF 
# from the RDD we need to give the Schema details describing about the data typr of each of the column. But 
# RDD is not required.


# Regarding Optimization 
# When dealing with the RDD, optimization has to be taken care, like if we use at last stage 
# filter then till that park all records would be fetched. But in DF catalyst comes into the picture 
# which takes care this scenario


# Compile Tym 
# Pyspark Dataframes at compile itself the errors are displayed where with the RDD only at the Runtime we
# will kno the issues
