In [12]:
from pyspark.sql.functions import udf, explode
from pyspark.sql.types import IntegerType
from pyspark.sql import Row
sc


In [5]:
test = sc.parallelize(zip(range(100),range(100,200)))
print(test)

# Filter on only the even first values. 
test.filter(lambda x:x[0] % 2 == 0)

# Map the second values to a square. 
#    Return a new RDD by applying a function to each element of this RDD.

# Square each of the values of the tuple
# print(test.map(lambda x:x[0] ** 2 ).collect())

# Use a UserDefinedFunction to 'map' the square to a function. 
def square_function(x):
    return x ** 2


# Create a new RDD and apply the square function. 
test2 = sc.parallelize(range(10))
print(test2.map(square_function).collect())

# Apply the square function to a multi-column RDD. 
squared_list = test.map(lambda x:square_function(x[0]))
print(squared_list.collect())

# Filter the squared values for the values less than 5000 and not starting with 4
def less_than_5000_not_four(x):
    if x < 5000 and str(x)[0] != '4':
        return_val = 'Touche'
    else: 
        return_val = "Not valid!"
    return (x,return_val)

print(squared_list.map(lambda x:less_than_5000_not_four(x)).collect())

# Create an RDD of the squared list tuple
touche_binned_RDD = squared_list.map(lambda x:less_than_5000_not_four(x))


ParallelCollectionRDD[11] at parallelize at PythonRDD.scala:480
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729, 784, 841, 900, 961, 1024, 1089, 1156, 1225, 1296, 1369, 1444, 1521, 1600, 1681, 1764, 1849, 1936, 2025, 2116, 2209, 2304, 2401, 2500, 2601, 2704, 2809, 2916, 3025, 3136, 3249, 3364, 3481, 3600, 3721, 3844, 3969, 4096, 4225, 4356, 4489, 4624, 4761, 4900, 5041, 5184, 5329, 5476, 5625, 5776, 5929, 6084, 6241, 6400, 6561, 6724, 6889, 7056, 7225, 7396, 7569, 7744, 7921, 8100, 8281, 8464, 8649, 8836, 9025, 9216, 9409, 9604, 9801]
[(0, 'Touche'), (1, 'Touche'), (4, 'Not valid!'), (9, 'Touche'), (16, 'Touche'), (25, 'Touche'), (36, 'Touche'), (49, 'Not valid!'), (64, 'Touche'), (81, 'Touche'), (100, 'Touche'), (121, 'Touche'), (144, 'Touche'), (169, 'Touche'), (196, 'Touche'), (225, 'Touche'), (256, 'Touche'), (289, 'Touche'), (324, 'Touche'), (361, 'Touche'), (400, 'No

In [26]:
def get_even_value_list(x):
    if x % 2 == 0:
        return_val = [x] * 5
    else:
        return_val = "None"
    return return_val


print(test.map(lambda x:get_even_value_list(x[0])).collect())


## What does Row do in a map function 


[[0, 0, 0, 0, 0], 'None', [2, 2, 2, 2, 2], 'None', [4, 4, 4, 4, 4], 'None', [6, 6, 6, 6, 6], 'None', [8, 8, 8, 8, 8], 'None', [10, 10, 10, 10, 10], 'None', [12, 12, 12, 12, 12], 'None', [14, 14, 14, 14, 14], 'None', [16, 16, 16, 16, 16], 'None', [18, 18, 18, 18, 18], 'None', [20, 20, 20, 20, 20], 'None', [22, 22, 22, 22, 22], 'None', [24, 24, 24, 24, 24], 'None', [26, 26, 26, 26, 26], 'None', [28, 28, 28, 28, 28], 'None', [30, 30, 30, 30, 30], 'None', [32, 32, 32, 32, 32], 'None', [34, 34, 34, 34, 34], 'None', [36, 36, 36, 36, 36], 'None', [38, 38, 38, 38, 38], 'None', [40, 40, 40, 40, 40], 'None', [42, 42, 42, 42, 42], 'None', [44, 44, 44, 44, 44], 'None', [46, 46, 46, 46, 46], 'None', [48, 48, 48, 48, 48], 'None', [50, 50, 50, 50, 50], 'None', [52, 52, 52, 52, 52], 'None', [54, 54, 54, 54, 54], 'None', [56, 56, 56, 56, 56], 'None', [58, 58, 58, 58, 58], 'None', [60, 60, 60, 60, 60], 'None', [62, 62, 62, 62, 62], 'None', [64, 64, 64, 64, 64], 'None', [66, 66, 66, 66, 66], 'None', [68,

### Turn RDD into a dataframe

In [7]:
# Using groupBy to take aggregate of a column 
# First convert the RDD to a dataframe. 
touche_binned_df = sqlContext.createDataFrame(touche_binned_RDD,['Number', 'Logic'])

# Then apply a groupBy and take the average on the number column. 
print(touche_binned_df.groupBy('Logic').agg({'Number':'mean'}).collect())

print(touche_binned_df.groupBy('Logic').agg({'Number':'max'}).collect())

[Row(Logic='Not valid!', avg(Number)=5960.585365853659), Row(Logic='Touche', avg(Number)=1423.1525423728813)]
[Row(Logic='Not valid!', max(Number)=9801), Row(Logic='Touche', max(Number)=3969)]


## Explode a column into individual rows


In [19]:
# Create an RDD to explode by the row values. 
explode_df = spark.createDataFrame([Row(a = 1, my_list = [1,2,3,4])])
print("Data frame with multiple entries in a single cell: \n {0}".format(explode_RDD.collect()))

# Explode the my_list cell list to multiple rows.
post_explode_df = explode_df.select(explode('my_list').alias('int_rows')).collect()

print(" \n Post explode dataframe: \n {0}".format(post_explode_df))

# Explode the column while preserving the rest of the dataframe. 
post_explode_df = explode_df.withColumn("exploded_col", col=explode('my_list')).collect()

Data frame with multiple entries in a single cell: 
 [Row(a=1, my_list=[1, 2, 3, 4])]
 
 Post explode dataframe: 
 [Row(int_rows=1), Row(int_rows=2), Row(int_rows=3), Row(int_rows=4)]
