### RESILIENT DISTRIBUTED DATASETS

### To working with RDD use SparkContext

### Run transforamtions before the actions

In [1]:
import re
from pyspark import SparkConf, SparkContext


conf = SparkConf().setMaster('local[*]').setAppName('spark-rdd-tut')
sc = SparkContext(conf=conf)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/27 08:38:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/03/27 08:38:39 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [17]:
rdd_test = sc.textFile('./data/sample.txt')

In [18]:
rdd_test.collect()

['1 2 3 4 5', '3 4 5 66 77', '12 43 6 7 8', '12 12 33']

In [2]:
def parse_line(line):
    fields = line.split(',')
    station_id = fields[0]
    entry_type = fields[2]
    temperature = float(fields[3]) * 0.1 * (9/5.0) + 32.0
    temperature = (temperature - 32) * 5/9

    return(station_id, entry_type, temperature)

lines = sc.textFile("./data/1800.csv")
parsed_lines = lines.map(parse_line)
min_temps = parsed_lines.filter(lambda x: "TMIN" in x[1])
max_temps = parsed_lines.filter(lambda x: "TMAX" in x[1])
station_temps_min = min_temps.map(lambda x: (x[0], x[2]))
station_temps_max = max_temps.map(lambda x: (x[0], x[2]))
stations = station_temps_max.map(lambda x: (x[0]))
print(set(stations.collect()))
min_temps = station_temps_min.reduceByKey(lambda x,y: min(x,y))
max_temps = station_temps_max.reduceByKey(lambda x,y: max(x,y))
results_min = min_temps.collect()
results_max = max_temps.collect()

for result in results_min:
    print(f"Station {result[0]}: {round(result[1],2)}\N{DEGREE SIGN}C")

for result in results_max:
    print(f"Station {result[0]}: {round(result[1],2)}\N{DEGREE SIGN}C")


                                                                                

{'ITE00100554', 'EZE00100082'}


                                                                                

Station ITE00100554: -14.8°C
Station EZE00100082: -13.5°C
Station ITE00100554: 32.3°C
Station EZE00100082: 32.3°C


In [3]:
#conf = SparkConf().setMaster("local").setAppName("WordCount")
#sc = SparkContext(conf=conf)

input = sc.textFile('./data/book.txt')
words = input.flatMap(lambda x: x.split())
word_counts = words.countByValue()

for word, count in word_counts.items():
    clean_word = word.encode('ascii', 'ignore')
    if (clean_word):
        #print(clean_word, count)
        pass


In [4]:
import re 

In [5]:
def normalize_words(text):
    return re.compile(r'\W+', re.UNICODE).split(text.lower())

input = sc.textFile('./data/book.txt')
words = input.flatMap(normalize_words)
word_counts = words.countByValue()

for word, count in word_counts.items():
    clean_word = word.encode('ascii', 'ignore')
    if clean_word:
        #print(clean_word, count)
        pass

In [6]:
def normalize_words(text):
    return re.compile(r'\W+', re.UNICODE).split(text.lower())


input = sc.textFile('./data/book.txt')
words = input.flatMap(normalize_words)

word_counts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)

word_counts_sorted = word_counts.map(lambda x: (x[1],x[0])).sortByKey()

results = word_counts_sorted.collect()

for result in results:
    count = str(result[0])
    word = result[1].encode('ascii', 'ignore')
    if word:
        #print(f'{word}: {count}')
        pass

In [7]:
def parse_line(line):
    fields = line.split(',')
    user_id = fields[0]
    price_type = float(fields[2])
    
    return(user_id, price_type)


input_orders = sc.textFile('./data/customer-orders.csv')
orders_by_users = input_orders.map(parse_line)

orders_counts = orders_by_users.reduceByKey(lambda x, y: x + y)
sorted_orders_by_users = orders_counts.map(lambda x: (x[1], x[0])).sortByKey(ascending=False)

for i,s in enumerate(sorted_orders_by_users.collect()):
    if i < 6:
        formated = float("{:.2f}".format(s[0]))
        print(f"User: {s[1]} -> {formated}$ ")
    else:
        break


User: 68 -> 6375.45$ 
User: 73 -> 6206.2$ 
User: 39 -> 6193.11$ 
User: 54 -> 6065.39$ 
User: 71 -> 5995.66$ 
User: 2 -> 5994.59$ 


#### DATAFRAMES

### Use SparkSession when creating a dataframe

In [2]:
from pyspark.sql.types import (StructType, StructField, 
                               IntegerType, BinaryType,
                              DateType, DoubleType,
                            LongType, TimestampType)

from pyspark.sql import Row
from pyspark.sql import DataFrame, Window
from pyspark.sql import functions as F
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession \
        .builder \
        .master('local[*]') \
        .getOrCreate()

In [16]:
df_wo = spark.read.csv('./data/london_weather.csv', header=True, inferSchema=True)

In [17]:
df_wo.show(10, truncate=True)

+--------+-----------+--------+----------------+--------+---------+--------+-------------+--------+----------+
|    date|cloud_cover|sunshine|global_radiation|max_temp|mean_temp|min_temp|precipitation|pressure|snow_depth|
+--------+-----------+--------+----------------+--------+---------+--------+-------------+--------+----------+
|19790101|        2.0|     7.0|            52.0|     2.3|     -4.1|    -7.5|          0.4|101900.0|       9.0|
|19790102|        6.0|     1.7|            27.0|     1.6|     -2.6|    -7.5|          0.0|102530.0|       8.0|
|19790103|        5.0|     0.0|            13.0|     1.3|     -2.8|    -7.2|          0.0|102050.0|       4.0|
|19790104|        8.0|     0.0|            13.0|    -0.3|     -2.6|    -6.5|          0.0|100840.0|       2.0|
|19790105|        6.0|     2.0|            29.0|     5.6|     -0.8|    -1.4|          0.0|102250.0|       1.0|
|19790106|        5.0|     3.8|            39.0|     8.3|     -0.5|    -6.6|          0.7|102780.0|       1.0|
|

In [18]:
df_wo_new = df_wo.withColumn('new_date', F.to_date(F.col("date"), 'yyyyMMdd'))

In [19]:
df_wo_new = df_wo_new.drop('date')

In [20]:
df_wo_new = df_wo_new.filter(df_wo_new.new_date >= F.lit("2017-12-01").cast(DateType())) 
df_wo_new = df_wo_new.filter(df_wo_new.new_date <= F.lit("2019-12-31").cast(DateType()))

In [21]:
df_wo_new.show(truncate=False)

+-----------+--------+----------------+--------+---------+--------+-------------+--------+----------+----------+
|cloud_cover|sunshine|global_radiation|max_temp|mean_temp|min_temp|precipitation|pressure|snow_depth|new_date  |
+-----------+--------+----------------+--------+---------+--------+-------------+--------+----------+----------+
|6.0        |3.5     |39.0            |7.5     |4.0      |0.8     |0.0          |102090.0|0.0       |2017-12-01|
|8.0        |0.0     |13.0            |10.4    |5.0      |2.5     |0.4          |102710.0|0.0       |2017-12-02|
|8.0        |0.0     |13.0            |10.0    |6.7      |3.0     |0.2          |102700.0|0.0       |2017-12-03|
|7.0        |0.0     |13.0            |9.4     |7.0      |4.0     |0.0          |103330.0|0.0       |2017-12-04|
|8.0        |0.0     |13.0            |11.6    |8.1      |6.8     |0.0          |103540.0|0.0       |2017-12-05|
|8.0        |0.0     |13.0            |12.5    |9.7      |7.7     |0.2          |102730.0|0.0   

In [22]:
def mapper(line):
    fields = line.split(',')
    return Row(ID=int(fields[0]), name=str(fields[1].encode("utf-8")), \
               age=int(fields[2]), num_friends=int(fields[3]))

lines = spark.sparkContext.textFile('./data/fakefriends.csv')
people = lines.map(mapper)

schema_people = spark.createDataFrame(people).cache()
schema_people.createOrReplaceTempView('people')

teenagers = spark.sql("SELECT * FROM people WHERE age >= 13 AND age <= 19")

for teen in teenagers.collect():
    print(teen)

schema_people.groupBy("age").count().orderBy("age").show()

spark.stop()



                                                                                

Row(ID=21, name="b'Miles'", age=19, num_friends=268)
Row(ID=52, name="b'Beverly'", age=19, num_friends=269)
Row(ID=54, name="b'Brunt'", age=19, num_friends=5)
Row(ID=106, name="b'Beverly'", age=18, num_friends=499)
Row(ID=115, name="b'Dukat'", age=18, num_friends=397)
Row(ID=133, name="b'Quark'", age=19, num_friends=265)
Row(ID=136, name="b'Will'", age=19, num_friends=335)
Row(ID=225, name="b'Elim'", age=19, num_friends=106)
Row(ID=304, name="b'Will'", age=19, num_friends=404)
Row(ID=341, name="b'Data'", age=18, num_friends=326)
Row(ID=366, name="b'Keiko'", age=19, num_friends=119)
Row(ID=373, name="b'Quark'", age=19, num_friends=272)
Row(ID=377, name="b'Beverly'", age=18, num_friends=418)
Row(ID=404, name="b'Kasidy'", age=18, num_friends=24)
Row(ID=409, name="b'Nog'", age=19, num_friends=267)
Row(ID=439, name="b'Data'", age=18, num_friends=417)
Row(ID=444, name="b'Keiko'", age=18, num_friends=472)
Row(ID=492, name="b'Dukat'", age=19, num_friends=36)
Row(ID=494, name="b'Kasidy'", age=1

In [28]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

people = spark.read \
    .option('header', 'true')\
    .option('inferSchema', 'true')\
    .csv('./data/fakefriends-header.csv')

print('Here is our inferred schema:')
people.printSchema()

print('Let\'s display the name column:')
people.select('name').show()

print('Filter out anyone over 21:')
people.filter(people.age < 21).show()

print('Group by age')
people.groupBy('age').count().show()

print('Make everyone 10 years older:')
people.select(people.name, people.age + 10).show()

Here is our inferred schema:
root
 |-- userID: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- friends: integer (nullable = true)

Let's display the name column:
+--------+
|    name|
+--------+
|    Will|
|Jean-Luc|
|    Hugh|
|  Deanna|
|   Quark|
|  Weyoun|
|  Gowron|
|    Will|
|  Jadzia|
|    Hugh|
|     Odo|
|     Ben|
|   Keiko|
|Jean-Luc|
|    Hugh|
|     Rom|
|  Weyoun|
|     Odo|
|Jean-Luc|
|  Geordi|
+--------+
only showing top 20 rows

Filter out anyone over 21:
+------+-------+---+-------+
|userID|   name|age|friends|
+------+-------+---+-------+
|    21|  Miles| 19|    268|
|    48|    Nog| 20|      1|
|    52|Beverly| 19|    269|
|    54|  Brunt| 19|      5|
|    60| Geordi| 20|    100|
|    73|  Brunt| 20|    384|
|   106|Beverly| 18|    499|
|   115|  Dukat| 18|    397|
|   133|  Quark| 19|    265|
|   136|   Will| 19|    335|
|   225|   Elim| 19|    106|
|   304|   Will| 19|    404|
|   327| Julian| 20|     63|
| 

In [29]:
friends = spark.read\
    .option('header', 'true')\
    .option('inferSchema', 'true')\
    .csv('./data/fakefriends-header.csv')



In [36]:
friends.printSchema()

root
 |-- userID: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- friends: integer (nullable = true)



In [43]:
friends_by_age = friends.select('age', 'friends')

friends_by_age.groupBy('age').avg('friends').sort('age').show()




+---+------------------+
|age|      avg(friends)|
+---+------------------+
| 18|           343.375|
| 19|213.27272727272728|
| 20|             165.0|
| 21|           350.875|
| 22|206.42857142857142|
| 23|             246.3|
| 24|             233.8|
| 25|197.45454545454547|
| 26|242.05882352941177|
| 27|           228.125|
| 28|             209.1|
| 29|215.91666666666666|
| 30| 235.8181818181818|
| 31|            267.25|
| 32| 207.9090909090909|
| 33| 325.3333333333333|
| 34|             245.5|
| 35|           211.625|
| 36|             246.6|
| 37|249.33333333333334|
+---+------------------+
only showing top 20 rows



In [47]:
friends_by_age.groupBy('age')\
    .agg(F.round(F.avg('friends'),2))\
    .sort('age').show()


+---+----------------------+
|age|round(avg(friends), 2)|
+---+----------------------+
| 18|                343.38|
| 19|                213.27|
| 20|                 165.0|
| 21|                350.88|
| 22|                206.43|
| 23|                 246.3|
| 24|                 233.8|
| 25|                197.45|
| 26|                242.06|
| 27|                228.13|
| 28|                 209.1|
| 29|                215.92|
| 30|                235.82|
| 31|                267.25|
| 32|                207.91|
| 33|                325.33|
| 34|                 245.5|
| 35|                211.63|
| 36|                 246.6|
| 37|                249.33|
+---+----------------------+
only showing top 20 rows



In [48]:
friends_by_age.groupBy('age')\
    .agg(F.round(F.avg('friends'),2).alias('friends_avg'))\
    .sort('age').show()

+---+-----------+
|age|friends_avg|
+---+-----------+
| 18|     343.38|
| 19|     213.27|
| 20|      165.0|
| 21|     350.88|
| 22|     206.43|
| 23|      246.3|
| 24|      233.8|
| 25|     197.45|
| 26|     242.06|
| 27|     228.13|
| 28|      209.1|
| 29|     215.92|
| 30|     235.82|
| 31|     267.25|
| 32|     207.91|
| 33|     325.33|
| 34|      245.5|
| 35|     211.63|
| 36|      246.6|
| 37|     249.33|
+---+-----------+
only showing top 20 rows



In [4]:
input_df = spark.read.text("./data/book.txt")
words = input_df.select(F.explode(F.split(input_df.value, '\\W+')).alias('word'))
words.filter(words.word != '')

lower_case_words = words.select(F.lower(words.word).alias('word'))



In [5]:
lower_case_words

DataFrame[word: string]