In [1]:
# Import the SparkContext class from the pyspark library.
# This is the entry point for Spark functionality.
from pyspark import SparkContext

Initializing a SparkContext


In [2]:
# Import the SparkContext class again (already imported in the previous cell).
# This is the entry point for Spark functionality.
from pyspark import SparkContext

# Get or create a SparkContext.
# getOrCreate() allows us to reuse an existing context if available, or create a new one.
sc = SparkContext.getOrCreate()

Simple lambda expression ex for squaring a number.



In [3]:
# Define a lambda function called 'square' that takes one argument 'x'.
# It returns the square of 'x'.
square = lambda x: x**2
# Print the result of calling the 'square' lambda function with the argument 5.
print(square(5))

25


Simple lambda expression ex for adding two numbers.



In [4]:
# Define a lambda function called 'add' that takes two arguments 'x' and 'y'.
# It returns the sum of 'x' and 'y'.
add = lambda x,y: x+y
# Print the result of calling the 'add' lambda function with arguments 5 and 10.
print(add(5,10))

15


In [5]:
# Use the %%writefile magic command to create a text file named 'example.txt'.
# This file will be used as input data for Spark RDD operations.
%%writefile example.txt
first line
second line
third line
fourth line

Overwriting example.txt


Loads the example text file into a Spark RDD.



In [6]:
# Load the 'example.txt' file into a Resilient Distributed Dataset (RDD) named 'textFile'.
# textFile() is a SparkContext method for reading text files into an RDD, where each line is an element.
textFile = sc.textFile('example.txt')

performs the count action on the RDD.



In [7]:
# Perform the 'count' action on the 'textFile' RDD.
# The count() action returns the number of elements (lines) in the RDD.
# This is an action, so it triggers the execution of the RDD transformations (if any) needed to compute the count.
textFile.count()

4

Performs the first action on the RDD.



In [8]:
# Perform the 'first' action on the 'textFile' RDD.
# The first() action returns the first element (line) of the RDD.
# This is an action that also triggers computation.
textFile.first()

'first line'

In [9]:
# Filter the textFile RDD to find lines that contain the word 'second'.
# This is a transformation that creates a new RDD containing only the matched lines.
# We are doing this to demonstrate how to select specific data based on a condition.
secfind = textFile.filter(lambda line: 'second' in line)

Displaying the RDD object itself only shows its lineage and not the data.



In [10]:
# Displaying the RDD object itself does not show the data within it.
# Instead, it shows the lineage of transformations that will be applied when an action is called.
# This is because RDD transformations are lazy and are not executed until an action is triggered.
secfind

PythonRDD[4] at RDD at PythonRDD.scala:53

- Explaining the `collect()` action and its purpose in bringing RDD data to the driver program.

- Demonstration and loading it into an RDD.



In [11]:
# Perform the 'collect()' action on the 'secfind' RDD.
# The collect() action brings all the elements of the RDD from the distributed workers to the driver program.
# This allows us to view the actual data contained in the RDD. Use with caution on large RDDs as it can cause out-of-memory errors.
secfind.collect()

['second line']

In [12]:
# Use the %%writefile magic command to create a new text file named 'example2.txt'.
# This file contains multiple lines with varying numbers of words and will be used to demonstrate RDD transformations like map and flatMap.
# We are creating this file to have different data for practicing more complex RDD operations.
%%writefile example2.txt
first
second line
the third line
then a fourth line

Overwriting example2.txt


In [13]:
# Load the 'example2.txt' file into a Resilient Distributed Dataset (RDD).
# Calling sc.textFile() creates an RDD object but does not immediately read the data or perform any computations (lazy evaluation).
# The output shows the RDD object's name and lineage, indicating it's ready for transformations or actions.
sc.textFile('example2.txt')

example2.txt MapPartitionsRDD[6] at textFile at NativeMethodAccessorImpl.java:0

In [14]:
# Assign the RDD created from 'example2.txt' to the variable 'text_rdd'.
# This variable now holds a reference to the distributed dataset, allowing us to perform transformations and actions on it.
text_rdd = sc.textFile('example2.txt')

In [15]:
# Apply a 'map' transformation to the 'text_rdd'.
# The lambda function 'lambda line: line.split()' is applied to each element (line) in the RDD.
# line.split() splits each line into a list of words based on whitespace.
# This transformation results in a new RDD where each element is a list of words from the original line.
words = text_rdd.map(lambda line: line.split())

In [16]:
# Perform the 'collect()' action on the 'words' RDD.
# This action triggers the execution of the map transformation and brings the resulting lists of words to the driver program.
# We are doing this to see the output of the map transformation, where each original line is now a list of its words.
words.collect()

[['first'],
 ['second', 'line'],
 ['the', 'third', 'line'],
 ['then', 'a', 'fourth', 'line']]

In [17]:
# Perform the 'collect()' action on the 'text_rdd'.
# This action retrieves all elements from the RDD and brings them to the driver program.
# We are doing this to display the original lines of the text file after it was loaded into an RDD.
text_rdd.collect()

['first', 'second line', 'the third line', 'then a fourth line']

In [18]:
# Apply the 'flatMap' transformation to the 'text_rdd'.
# The lambda function 'lambda line: line.split()' is applied to each line.
# flatMap is similar to map but it flattens the output, so instead of a list of lists, we get a single list of all words.
# We are using flatMap to get a single RDD containing all individual words from all lines.
# The collect() action is immediately called to trigger the transformation and bring the results to the driver.
text_rdd.flatMap(lambda line: line.split()).collect()

['first',
 'second',
 'line',
 'the',
 'third',
 'line',
 'then',
 'a',
 'fourth',
 'line']

In [19]:
# Use the %%writefile magic command to create a text file named 'services.txt'.
# This file contains structured data with a header row and multiple records, which will be used for data processing examples.
# We are creating this file to work with more realistic, structured data in Spark.
%%writefile services.txt
#EventId    Timestamp    Customer   State    ServiceID    Amount
201         10/13/2017   100        NY       131          100.00
202         10/14/2017   101        CA       132          150.50
203         10/15/2017   102        TX       133          200.75
204         10/16/2017   103        CA       134          120.00
205         10/17/2017   104        NY       135          180.25
206         10/18/2017   105        TX       136          250.00

Overwriting services.txt


In [20]:
# Load the 'services.txt' file into a Resilient Distributed Dataset (RDD) named 'services'.
# sc.textFile() reads the file line by line and creates an RDD where each element is a line from the file.
# This is the first step to process the data in the 'services.txt' file using Spark.
services = sc.textFile('services.txt')

In [21]:
# Perform the 'take(2)' action on the 'services' RDD.
# take(n) returns a list containing the first n elements of the RDD.
# We are doing this to quickly inspect the first few lines of the 'services' RDD, including the header, to understand its structure.
services.take(2)

['#EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201         10/13/2017   100        NY       131          100.00']

In [22]:
# Apply a 'map' transformation to the 'services' RDD.
# The lambda function 'lambda line: line.split()' is applied to each line, splitting it into a list of strings based on whitespace.
# We are doing this to parse each line into individual fields so we can access and process the data within each record.
# The 'take(3)' action is immediately called to trigger the transformation and display the first three resulting lists.
services.map(lambda line: line.split()).take(3)

[['#EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['202', '10/14/2017', '101', 'CA', '132', '150.50']]

In [23]:
# Apply a 'map' transformation to the 'services' RDD to remove the '#' character from the header line.
# The lambda function checks if a line starts with '#'. If it does, it returns the line from the second character onwards (removing '#').
# If the line does not start with '#', the original line is returned.
# We are doing this to clean the header row so it doesn't contain the comment character.
# The 'collect()' action is immediately called to trigger the transformation and display all the resulting lines.
services.map(lambda line: line [1:] if line [0]=='#' else line).collect()

['EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201         10/13/2017   100        NY       131          100.00',
 '202         10/14/2017   101        CA       132          150.50',
 '203         10/15/2017   102        TX       133          200.75',
 '204         10/16/2017   103        CA       134          120.00',
 '205         10/17/2017   104        NY       135          180.25',
 '206         10/18/2017   105        TX       136          250.00']

In [24]:
# Apply a 'map' transformation to the 'services' RDD to remove the '#' character from the header line.
# The lambda function checks if a line starts with '#'. If it does, it returns the line from the second character onwards (removing '#').
# If the line does not start with '#', the original line is returned.
# We are doing this to clean the header row so it doesn't contain the comment character.
# This transformation is stored in a new RDD called 'clean'.
clean = services.map(lambda line: line [1:] if line [0]=='#' else line)

In [25]:
# Apply a 'map' transformation to the 'clean' RDD.
# The lambda function 'lambda line: line.split()' is applied to each line, splitting it into a list of strings based on whitespace.
# This is done to parse the data from each line into individual fields (columns) which can then be easily accessed and processed.
clean = clean.map(lambda line: line.split())

In [26]:
# Perform the 'collect()' action on the 'clean' RDD.
# The collect() action brings all the elements of the RDD from the distributed workers to the driver program.
# This is done to display the result of the previous transformations (removing '#' and splitting lines) and see the cleaned and parsed data as a list of lists.
clean.collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['202', '10/14/2017', '101', 'CA', '132', '150.50'],
 ['203', '10/15/2017', '102', 'TX', '133', '200.75'],
 ['204', '10/16/2017', '103', 'CA', '134', '120.00'],
 ['205', '10/17/2017', '104', 'NY', '135', '180.25'],
 ['206', '10/18/2017', '105', 'TX', '136', '250.00']]

In [27]:
# Apply a 'map' transformation to the 'clean' RDD.
# The lambda function 'lambda lst: (lst[3], lst[-1])' is applied to each element (which is a list of strings representing a row).
# It extracts the element at index 3 (which is the 'State') and the last element (which is the 'Amount').
# This is done to create key-value pairs where the key is the State and the value is the Amount. This format is suitable for aggregation by state.
pairs = clean.map(lambda lst: (lst[3], lst[-1]))

In [28]:
# Perform the 'collect()' action on the 'pairs' RDD.
# The collect() action brings all the elements of the RDD (the key-value pairs of State and Amount) to the driver program.
# This is done to inspect the generated pairs and verify that the map transformation correctly extracted the State and Amount for each record.
pairs.collect()

[('State', 'Amount'),
 ('NY', '100.00'),
 ('CA', '150.50'),
 ('TX', '200.75'),
 ('CA', '120.00'),
 ('NY', '180.25'),
 ('TX', '250.00')]

In [29]:
# Filter out the header row pair ('State', 'Amount') from the 'pairs' RDD.
# The lambda function 'lambda pair: pair[0] != 'State'' keeps only the pairs where the key (the State) is not 'State'.
# Then, apply the 'reduceByKey' transformation to sum the amounts for each state.
# The lambda function 'lambda amt1, amt2: float(amt1) + float(amt2)' takes two amounts (amt1 and amt2) for the same key (State), converts them to floats, and returns their sum.
# This is done to calculate the total service amount for each unique state, aggregating the amounts for all records belonging to that state.
rekey = pairs.filter(lambda pair: pair[0] != 'State').reduceByKey(lambda amt1, amt2: float(amt1) + float(amt2))

In [30]:
# Perform the 'collect()' action on the 'rekey' RDD.
# The collect() action brings all the aggregated results (the total amount per state) from the distributed workers to the driver program.
# This is done to display the final output of the aggregation, showing the sum of service amounts for each state.
rekey.collect()

[('NY', 280.25), ('CA', 270.5), ('TX', 450.75)]

In [31]:
# Perform the 'collect()' action on the 'clean' RDD again.
# The collect() action brings all the elements of the RDD (the cleaned and split data) to the driver program.
# This is done to re-examine the data structure after cleaning and splitting, potentially for review, comparison, or before performing further operations. It confirms the state of the RDD at this point.
clean.collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['202', '10/14/2017', '101', 'CA', '132', '150.50'],
 ['203', '10/15/2017', '102', 'TX', '133', '200.75'],
 ['204', '10/16/2017', '103', 'CA', '134', '120.00'],
 ['205', '10/17/2017', '104', 'NY', '135', '180.25'],
 ['206', '10/18/2017', '105', 'TX', '136', '250.00']]

In [32]:
# Grab (State, Amount) pairs from the cleaned data.
# We are doing this to isolate the state and the corresponding service amount for aggregation.
step1 = clean.map(lambda lst: (lst[3], lst[-1]))

# Reduce by Key (State) and sum the amounts.
# We are doing this to aggregate the total amount for each state.
# The lambda function converts the amounts to float before summing them to ensure correct numerical addition.
step2 = step1.reduceByKey(lambda amt1, amt2: float(amt1) + float(amt2))

# Get rid of the header row which was also treated as a key-value pair in the previous step.
# We are doing this because ('State', 'Amount') is not an actual data point we want to include in the final result.
step3 = step2.filter(lambda x: not x[0] == 'State')

# Sort Results by Amount in descending order.
# We are doing this to easily see which states have the highest total service amounts, ordering the results from largest to smallest amount.
step4 = step3.sortBy(lambda stAmount: stAmount[1], ascending=False)

# Action: Collect the results to display them.
# We are doing this to bring the distributed RDD data (the sorted total amounts per state) back to the driver program for viewing and inspection.
step4.collect()

[('TX', 450.75), ('NY', 280.25), ('CA', 270.5)]

In [33]:
# Define a Python list named 'x'.
# This list contains three string elements and is used to demonstrate accessing elements by index.
x = ['ID', 'State', 'Amount']

In [34]:
# Define a function named 'func1' that takes one argument, 'lst', which is expected to be a list.
# This function returns the last element of the input list using negative indexing (lst[-1]).
# This demonstrates a simple way to access the last element of a list in Python.
def func1(lst):
    return lst[-1]

In [35]:
# Define a function named 'func2' that takes one argument, 'id_st_amt'.
# This function is intended to demonstrate unpacking a tuple or list into separate variables (Id, st, amt).
# It then returns the third element (at index 2 after unpacking, or directly at index 2 if treated as a list/tuple) which is assigned to the variable 'amt'.
# This highlights how to access elements after unpacking or by direct indexing.
def func2(id_st_amt):
    # Unpack Values (though calling with a list like 'x' will treat it as a single iterable for unpacking)
    (Id, st, amt) = id_st_amt
    return amt

In [36]:
# Call the 'func1' function with the list 'x'.
# This demonstrates how to use the 'func1' function and shows that it correctly returns the last element of the list 'x'.
# The output will be the string 'Amount'.
func1(x)

'Amount'

In [37]:
# Call the 'func2' function with the list 'x'.
# While 'func2' is designed to unpack a tuple, passing a list like 'x' works because lists are iterable.
# The function will attempt to unpack 'x' into three variables (Id, st, amt) and then return the value of 'amt'.
# This demonstrates the function's usage and how it handles the input, highlighting that Python's unpacking can work with different iterable types like lists, provided the number of elements matches.
# The output will be the string 'Amount'.
func2(x)

'Amount'