In [1]:
from pyspark.sql import SparkSession

spark = SparkSession\
        .builder\
        .appName("Tutorial2_DataFrames")\
        .getOrCreate()

22/11/09 16:26:57 WARN Utils: Your hostname, bigdata-vmware resolves to a loopback address: 127.0.1.1; using 192.168.10.135 instead (on interface ens33)
22/11/09 16:26:57 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/11/09 16:26:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [7]:
# Shuts down spark session
spark.stop()

In [16]:
#df = spark.read.json("data/people.json")
df = spark.read.csv("data/people.csv", header=True)
#df = spark.read.parquet("data/users.parquet")

df.printSchema()
df.show()
df.show(3)
df.collect()
#df.first()
#df.count()


root
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- job: string (nullable = true)

+-----+---+--------------+
| name|age|           job|
+-----+---+--------------+
|Maria| 25|       Manager|
| João| 30|Data Scientist|
|Jorge| 30|     Developer|
| Rita| 28|Data Scientist|
|  Bob| 32|     Developer|
+-----+---+--------------+

+-----+---+--------------+
| name|age|           job|
+-----+---+--------------+
|Maria| 25|       Manager|
| João| 30|Data Scientist|
|Jorge| 30|     Developer|
+-----+---+--------------+
only showing top 3 rows



[Row(name='Maria', age='25', job='Manager'),
 Row(name='João', age='30', job='Data Scientist'),
 Row(name='Jorge', age='30', job='Developer'),
 Row(name='Rita', age='28', job='Data Scientist'),
 Row(name='Bob', age='32', job='Developer')]

In [8]:
# Steps to load data from hdfs:
# 1) You should exit VS Code. Why? Because python uses the same port (9000) as hdfs.
# 2) You should start hdfs and yarn and copy your data to hdfs.
# 3) Reopen VS Code.
# 4) Open file /home/bigdata/hadoop-3.4.4/etc/hadoop/core-site.xml and copy the value of property "fs.defaultFS", which represents the Hadoop name node path. In our case, it should have value "hdfs://localhost:9000".
# 4) Start the spark session (first code cell of this notebook).
# 5) Run this code cell.

#df = spark.read.text("hdfs://localhost:9000/user/bigdata/data/book.txt")
df = spark.read.csv("hdfs://localhost:9000/user/bigdata/data/people.csv", header=True)
df.show()

In [45]:
rows = [['Maria', 25, 'Manager'],
        ['João', 30, 'Data Scientist'],
        ['Jorge', 30, 'Developer'],
        ['Rita', 28, 'Data Scientist'],
        ['António', 32, 'Developer']]

columns = ['Name', 'age', 'job']
  
dataframe = spark.createDataFrame(rows, columns)
  
dataframe.show()

+-------+---+--------------+
|   Name|age|           job|
+-------+---+--------------+
|  Maria| 25|       Manager|
|   João| 30|Data Scientist|
|  Jorge| 30|     Developer|
|   Rita| 28|Data Scientist|
|António| 32|     Developer|
+-------+---+--------------+



In [36]:
# Select only the "name" and "age" columns

df1 = spark.read.csv("data/people.csv", header=True)

df2 = df1.select("name", "age")

df2.show()

+-----+---+
| name|age|
+-----+---+
|Jorge| 30|
|  Bob| 32|
+-----+---+



In [39]:
# Select all the columns, but increment the age by 1

df = spark.read.csv("data/people.csv", header=True)

df.select("name", df['age'] + 1, "job").show()

+-----+---------+---------+
| name|(age + 1)|      job|
+-----+---------+---------+
|Jorge|     31.0|Developer|
|  Bob|     33.0|Developer|
+-----+---------+---------+



In [42]:
# Select people older than 21

df = spark.read.csv("data/people.csv", header=True)

df.filter(df['age'] > 28).show()

+-----+---+---------+
| name|age|      job|
+-----+---+---------+
|Jorge| 30|Developer|
|  Bob| 32|Developer|
+-----+---+---------+



In [47]:
# Count people by age

df = spark.read.csv("data/people.csv", header=True)

df = df.groupBy("age").count().show()

# groupBy() returns a GroupData, wich consists in set of methods for aggregations on a DataFrame, created by DataFrame.groupBy().
# The count() method of GroupData counts the number of records for each group and returns a DataFrame.

+---+-----+
|age|count|
+---+-----+
| 30|    2|
| 25|    1|
| 32|    1|
+---+-----+



Working with SQL

In [5]:
df = spark.read.csv("data/people.csv", header=True)

df.createOrReplaceTempView("vintages")

sqlDF = spark.sql("SELECT * FROM vintages WHERE age < 30")
sqlDF.show()

# See also createGlobalTempView()

+-----+---+--------------+
| name|age|           job|
+-----+---+--------------+
|Maria| 25|       Manager|
| Rita| 28|Data Scientist|
+-----+---+--------------+



In [30]:
# Run SQL on files directly

df = spark.sql("SELECT * FROM parquet.`data/users.parquet`")

df.show()

[Stage 49:>                                                         (0 + 1) / 1]

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+



                                                                                

Writing a data frame to a file

In [35]:
# Select only the "name" and "age" columns

df1 = spark.read.csv("data/people.csv", header=True)

df2 = df1.select("name", "age")

df2.show()

print(df2.rdd.getNumPartitions())

df2.write.csv("data/people_name_age", header=True)

# Alternative
#df2.write.format("csv").save("data/people_name_age")

+-----+---+
| name|age|
+-----+---+
|Maria| 25|
| João| 30|
|Jorge| 30|
| Rita| 28|
|  Bob| 32|
+-----+---+

1


Working with text files

In [49]:
df = spark.read.text("data/hello.txt")

df.show()
df.collect()

+-----------+
|      value|
+-----------+
| hello word|
|  Hello spa|
|hello spark|
|spark hello|
+-----------+



[Row(value='hello word'),
 Row(value='Hello spa'),
 Row(value='hello spark'),
 Row(value='spark hello')]

In [36]:
# Lines containing some word

df = spark.read.text("data/hello.txt")

linesWithSpark = df.filter(df.value.contains("spark"))

linesWithSpark.show()

df.filter(df.value.contains("spark")).count()

+---------------+
|          value|
+---------------+
|    hello spark|
|    spark hello|
|hellosparkhello|
+---------------+



3

In [51]:
# Maximum row size

from pyspark.sql.functions import *

df = spark.read.text("data/hello.txt")

df.select(size(split(df.value, "\s+")) \
        .name("numWords")) \
        .agg(max(col("numWords"))) \
        .collect()

#df.select(split(df.value, "\s+")).show()

#df.select(size(split(df.value, "\s+"))).show()

#df.select(size(split(df.value, "\s+")).name("numWords")).agg(max(col("numWords"))).first().__getitem__("max(numWords)")

#df.select(size(split(df.value, "\s+")).name("numWords")).agg(max(col("numWords")).name("max")).first().__getitem__("max(numWords)")

# This first maps a line to an integer value and aliases it as “numWords”, creating a new DataFrame. agg is called on that DataFrame to find the largest word count. The arguments to select and agg are both Column, we can use df.colName to get a column from a DataFrame. We can also import pyspark.sql.functions, which provides a lot of convenient functions to build a new Column from an old one.


[Row(max(numWords)=2)]

In [54]:
# Number of occurences of each word

df = spark.read.text("data/hello.txt")

wordCounts = df.select(explode(split(df.value, "\s+")).alias("word")).groupBy("word").count()

wordCounts.show()

# explode(): returns a new row for each element in the given array or map. Uses the default column name col for elements in the array and key and value for elements in the map unless specified otherwise.

# Here, we use the explode function in select, to transform a Dataset of lines to a Dataset of words, and then combine groupBy and count to compute the per-word counts in the file as a DataFrame of 2 columns: “word” and “count”.

# Notice that the above code basically implements a MapReduce flow

+-----+-----+
| word|count|
+-----+-----+
|hello|    3|
|Hello|    1|
|spark|    2|
|world|    1|
|  spa|    1|
+-----+-----+



Interoperating with RDDs

In [44]:
from pyspark.sql import Row
from pyspark.sql.functions import *

sc = spark.sparkContext

rdd = sc.textFile("data/logs.txt")

# Creates a DataFrame having a single column named "line"
df = rdd.map(lambda r: Row(r)).toDF(["line"])

errorsDF = df.filter(col("line").like("%ERROR%"))
#errorsDF = df.filter(col("line").like("ERROR%"))
#errorsDF = df.filter(col("line").like("ERROR_%MySQL%"))

# Counts all the errors
errorsDF.count()

# Counts errors mentioning MySQL
#errorsDF.filter(col("line").like("%MySQL%")).count()

# Fetches the MySQL errors as an array of strings
#errorsDF.filter(col("line").like("%MySQL%")).collect()

1

In [27]:
# Spark SQL can convert an RDD of Row objects to a DataFrame, inferring the datatypes.

from pyspark.sql import Row

def schema_inference_example(spark: SparkSession, filePath) -> None:
  
    sc = spark.sparkContext

    # Load a text file and convert each line to a Row.
    
    linesRDD = sc.textFile(filePath)

    #linesRDD.foreach(print)

    partsRDD = linesRDD.map(lambda line: line.split(","))

    #partsRDD.foreach(print)

    peopleRDD = partsRDD.map(lambda p: Row(name=p[0], age=int(p[1])))

    #peopleRDD.foreach(print)

    # Infer the schema, and register the DataFrame as a table.
    
    peopleDF = spark.createDataFrame(peopleRDD)
    
    #peopleDF.show()

    #peopleDF.printSchema()

    peopleDF.createOrReplaceTempView("people")
    
    # SQL can be run over DataFrames that have been registered as a table.
    # The results of SQL queries are Dataframe objects.

    almost30DF = spark.sql("SELECT name FROM people WHERE age >= 28 AND age < 30")

    #almost30DF.show()
    
    # rdd (below) returns the content as an :class:`pyspark.RDD` of :class:`Row`.
    # That is, it returns the content of the DataFrame as a pyspark.RDD of Row.
    
    almost30NamesRDD = almost30DF.rdd.map(lambda p: "Name: " + p.name)
    
    #almost30DF.foreach(print)

    #arrayAlmost30 = almost30NamesRDD.collect()

    #for name in arrayAlmost30:
    #    print(name)

schema_inference_example(spark, "data/people.txt")

In [29]:
# Programmatically Specifying the Schema

from pyspark.sql.types import StringType, StructType, StructField

def programmatic_schema_example(spark: SparkSession, filePath) -> None:
    
    sc = spark.sparkContext

    # Load a text file and convert each line to a Row.
    linesRDD = sc.textFile(filePath)
    partsRDD = linesRDD.map(lambda line: line.split(","))
    
    # Each line is converted to a tuple.
    peopleRDD = partsRDD.map(lambda p: (p[0], p[1].strip()))

    #peopleRDD.foreach(print)

    # The schema is encoded in a string.
    schemaString = "name age"

    fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
    
    schema = StructType(fields)

    # Apply the schema to the RDD.
    peopleDF = spark.createDataFrame(peopleRDD, schema)

    # Creates a temporary view using the DataFrame
    peopleDF.createOrReplaceTempView("people")

    # SQL can be run over DataFrames that have been registered as a table.
    resultsDF = spark.sql("SELECT name FROM people")

    resultsDF.show()


programmatic_schema_example(spark, "data/people.txt")

+-----+
| name|
+-----+
|Maria|
| João|
|Jorge|
| Rita|
|  Bob|
+-----+



Bibliography:
- https://spark.apache.org/docs/2.2.0/sql-programming-guide.html#datasets-and-dataframes
- https://spark.apache.org/docs/latest/quick-start.html
- https://spark.apache.org/docs/3.1.3/api/python/ 
- Learning Spark Lightning-Fast Big Data Analysis, Holden Karau, Andy Konwinski, Patrick Wendell, Matei Zaharia. O'Reilley, 2015.
- https://sparkbyexamples.com/