In [8]:
import pyspark

sc = pyspark.SparkContext.getOrCreate()

spark = pyspark.sql.SparkSession(sc)

In [10]:
import os

ROOT_PATH = os.path.join(os.getcwd(), "..")

DATA_PATH = os.path.join(ROOT_PATH, "data")
OUTPUT_PATH = os.path.join(ROOT_PATH, "output")

## Load 2 CSV file to 2 dataframes

In [18]:
# Load book contents
book_chapter_df = spark.read\
            .option("inferSchema", "true")\
            .option("header", "true")\
            .csv(os.path.join(DATA_PATH, "bookcontents.csv"))

book_chapter_df.printSchema()
book_chapter_df.show()


# Load section
section_df = spark.read\
            .option("inferSchema", "true")\
            .option("header", "true")\
            .csv(os.path.join(DATA_PATH, "sections.csv"))

section_df.printSchema()
section_df.show()

# Test filter some data in the dataframe
section_df.where("Chapter == 5 and Section == 5.1").show()

root
 |-- Chapter: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Page: integer (nullable = true)

+-------+--------------------+----+
|Chapter|                Name|Page|
+-------+--------------------+----+
|      1|        Introduction|  11|
|      2|Basic Engineering...|  19|
|      3|Advanced Engineer...|  28|
|      4|     Hands On Course|  60|
|      5|        Case Studies|  62|
|      6|Best Practices Cl...|  73|
|      7|130+ Data Sources...|  77|
|      8|1001 Interview Qu...|  82|
|      9|Recommended Books...|  87|
+-------+--------------------+----+

root
 |-- Chapter: integer (nullable = true)
 |-- Section: double (nullable = true)
 |-- Section_Name: string (nullable = true)

+-------+-------+--------------------+
|Chapter|Section|        Section_Name|
+-------+-------+--------------------+
|      1|    1.1|What is this Cook...|
|      1|    1.2|Data Engineer vs ...|
|      1|    1.3|My Data Science P...|
|      1|    1.4|  Who Companies Need|
|      2| 

## Join section with book content by chapter

In [34]:
section_df\
    .join(book_chapter_df, book_chapter_df.Chapter == section_df.Chapter, "inner")\
    .filter((book_chapter_df.Chapter.isin(1, 2)) & (book_chapter_df.Page == 11))\
    .show()

+-------+-------+--------------------+-------+------------+----+
|Chapter|Section|        Section_Name|Chapter|        Name|Page|
+-------+-------+--------------------+-------+------------+----+
|      1|    1.1|What is this Cook...|      1|Introduction|  11|
|      1|    1.2|Data Engineer vs ...|      1|Introduction|  11|
|      1|    1.3|My Data Science P...|      1|Introduction|  11|
|      1|    1.4|  Who Companies Need|      1|Introduction|  11|
+-------+-------+--------------------+-------+------------+----+



## Access and do expression on columns from dataframe

In [45]:
book_contents_df = book_chapter_df\
                        .join(section_df, "Chapter")

book_contents_df.show()

book_contents_df.select(
    book_contents_df.Chapter,
    book_contents_df.Section,
    (book_contents_df.Section + 1).alias("Inc_Section")
)\
    .where(book_contents_df.Chapter == 2)\
    .show()

+-------+--------------------+----+-------+--------------------+
|Chapter|                Name|Page|Section|        Section_Name|
+-------+--------------------+----+-------+--------------------+
|      1|        Introduction|  11|    1.1|What is this Cook...|
|      1|        Introduction|  11|    1.2|Data Engineer vs ...|
|      1|        Introduction|  11|    1.3|My Data Science P...|
|      1|        Introduction|  11|    1.4|  Who Companies Need|
|      2|Basic Engineering...|  19|    2.1|       Learn To Code|
|      2|Basic Engineering...|  19|    2.2|Get Familiar With...|
|      2|Basic Engineering...|  19|    2.3|   Agile Development|
|      2|Basic Engineering...|  19|    2.4|Software Engineer...|
|      2|Basic Engineering...|  19|    2.5|Learn how a Compu...|
|      2|Basic Engineering...|  19|    2.6|Data Network Tran...|
|      2|Basic Engineering...|  19|    2.7|Security and Privacy|
|      2|Basic Engineering...|  19|    2.8|               Linux|
|      2|Basic Engineerin

## Group By and Aggregation Function

In [57]:
book_contents_df.groupBy(book_contents_df.Chapter).count().show()

+-------+-----+
|Chapter|count|
+-------+-----+
|      1|    4|
|      6|    3|
|      3|    8|
|      5|   37|
|      9|    3|
|      4|    7|
|      8|    1|
|      7|   19|
|      2|   11|
+-------+-----+



## Work with Spark SQL (We totally work directly with Spark DataFrame

In [60]:
# Create a temporary view
book_contents_df.createOrReplaceTempView("book_contents_view")

# Select from view above
view_df = spark.sql("""SELECT * FROM book_contents_view WHERE Page = 11""")
view_df.printSchema()
view_df.show()

root
 |-- Chapter: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Page: integer (nullable = true)
 |-- Section: double (nullable = true)
 |-- Section_Name: string (nullable = true)

+-------+------------+----+-------+--------------------+
|Chapter|        Name|Page|Section|        Section_Name|
+-------+------------+----+-------+--------------------+
|      1|Introduction|  11|    1.1|What is this Cook...|
|      1|Introduction|  11|    1.2|Data Engineer vs ...|
|      1|Introduction|  11|    1.3|My Data Science P...|
|      1|Introduction|  11|    1.4|  Who Companies Need|
+-------+------------+----+-------+--------------------+



In [62]:
spark.catalog.listTables()

[Table(name='book_contents_view', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

## Create RDD from Dataframe

In [72]:
book_contents_rdd = book_contents_df.rdd

for row in book_contents_rdd.take(10):
    print(row)
    

Row(Chapter=1, Name='Introduction', Page=11, Section=1.1, Section_Name='What is this Cookbook')
Row(Chapter=1, Name='Introduction', Page=11, Section=1.2, Section_Name='Data Engineer vs Data Scientist')
Row(Chapter=1, Name='Introduction', Page=11, Section=1.3, Section_Name='My Data Science Platform Blueprint')
Row(Chapter=1, Name='Introduction', Page=11, Section=1.4, Section_Name='Who Companies Need')
Row(Chapter=2, Name='Basic Engineering Skills', Page=19, Section=2.1, Section_Name='Learn To Code')
Row(Chapter=2, Name='Basic Engineering Skills', Page=19, Section=2.2, Section_Name='Get Familiar With Git')
Row(Chapter=2, Name='Basic Engineering Skills', Page=19, Section=2.3, Section_Name='Agile Development')
Row(Chapter=2, Name='Basic Engineering Skills', Page=19, Section=2.4, Section_Name='Software Engineering Culture')
Row(Chapter=2, Name='Basic Engineering Skills', Page=19, Section=2.5, Section_Name='Learn how a Computer Works')
Row(Chapter=2, Name='Basic Engineering Skills', Page=19,

In [111]:
from pyspark.sql.types import Row, StructField, StringType, StructType, IntegerType

def compound_column(columns):
    
    dims = str(columns[2]) + "/" + columns[1]
    return Row(
        Chaper=columns[0],
        Compound=dims
    )

splitRDD = book_contents_rdd.map(compound_column)

for row in splitRDD.take(5):
    print(row)

Row(Chaper=1, Compound='11/Introduction')
Row(Chaper=1, Compound='11/Introduction')
Row(Chaper=1, Compound='11/Introduction')
Row(Chaper=1, Compound='11/Introduction')
Row(Chaper=2, Compound='19/Basic Engineering Skills')


## Convert RDD back to DataFrame with defined schema

In [86]:
schema = StructType([
    StructField("Chapter", StringType(), False),
    StructField("Compound", StringType(), False)
])

compound_df = spark.createDataFrame(splitRDD, schema)

compound_df.printSchema()

compound_df.show()

root
 |-- Chapter: string (nullable = false)
 |-- Compound: string (nullable = false)

+-------+--------------------+
|Chapter|            Compound|
+-------+--------------------+
|      1|     11/Introduction|
|      1|     11/Introduction|
|      1|     11/Introduction|
|      1|     11/Introduction|
|      2|19/Basic Engineer...|
|      2|19/Basic Engineer...|
|      2|19/Basic Engineer...|
|      2|19/Basic Engineer...|
|      2|19/Basic Engineer...|
|      2|19/Basic Engineer...|
|      2|19/Basic Engineer...|
|      2|19/Basic Engineer...|
|      2|19/Basic Engineer...|
|      2|19/Basic Engineer...|
|      2|19/Basic Engineer...|
|      3|28/Advanced Engin...|
|      3|28/Advanced Engin...|
|      3|28/Advanced Engin...|
|      3|28/Advanced Engin...|
|      3|28/Advanced Engin...|
+-------+--------------------+
only showing top 20 rows



## Spark counting words

In [113]:
# Open file text in Spark as RDD format
section_rdd = sc.textFile(os.path.join(DATA_PATH, "sections_wordcount.csv"))

# Let view some rows -> this is a text
for row in section_rdd.take(5):
    print(row)
    
print("====\n")
    
# Map through each row then split by delimiter is ,
section_split_rdd = section_rdd\
                        .map(lambda line: line.split(","))\
                        .map(lambda elements: elements[2].lower())\
                        .flatMap(lambda sentence: sentence.split(" "))\
                        .map(lambda word: (word, 1))\
                        .reduceByKey(lambda x, y: x+y)\
                        .sortByKey(False)

# section_split_rdd.collect()

1,1.1,What is this Cookbook
1,1.2,Data Engineer vs Data Scientist
1,1.3,My Data Science Platform Blueprint
1,1.4,Who Companies Need
2,2.1,Learn To Code
====



In [123]:
word_count_schema = StructType([
    StructField("Word", StringType()),
    StructField("Count", IntegerType())
])

word_count_df = spark.createDataFrame(section_split_rdd, word_count_schema)

word_count_df.printSchema()
word_count_df.orderBy(word_count_df.Count.desc()).show()

word_count_df.write.mode("overwrite").csv(os.path.join(OUTPUT_PATH, "word_count.csv"))

root
 |-- Word: string (nullable = true)
 |-- Count: integer (nullable = true)

+-----------+-----+
|       Word|Count|
+-----------+-----+
|       data|   48|
|    science|   40|
|        and|    9|
|         to|    4|
|     apache|    3|
|   platform|    3|
| processing|    3|
|          a|    3|
|    courses|    2|
|    twitter|    2|
|       with|    2|
|        the|    2|
|     amazon|    2|
|      learn|    2|
|development|    2|
|     google|    2|
|       from|    2|
|       nifi|    2|
|       what|    2|
|   zeppelin|    2|
+-----------+-----+
only showing top 20 rows

