<h1 style="text-align:center"> INFO 323: Cloud Computing and Big Data</h1>
<h2 style="text-align:center"> College of Computing and Informatics</h2>
<h2 style="text-align:center">Drexel University</h2>

<h3 style="text-align:center"> Spark Reader and Writer</h3>
<h3 style="text-align:center"> Yuan An, PhD</h3>
<h3 style="text-align:center">Associate Professor</h3>

# Reader & Writer
1. Read from CSV files
1. Read from JSON files
1. Write DataFrame to files
1. Write DataFrame to tables

##### Methods
- DataFrameReader (<a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=dataframereader#pyspark.sql.DataFrameReader" target="_blank">Python</a>/<a href="http://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/DataFrameReader.html" target="_blank">Scala</a>): `csv`, `json`, `option`, `schema`
- DataFrameWriter (<a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=dataframereader#pyspark.sql.DataFrameWriter" target="_blank">Python</a>/<a href="http://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/DataFrameWriter.html" target="_blank">Scala</a>): `mode`, `option`, `parquet`, `format`, `saveAsTable`
- StructType (<a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=structtype#pyspark.sql.types.StructType" target="_blank">Python</a>/<a href="http://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/types/StructType.html" target="_blank" target="_blank">Scala</a>): `toDDL`

##### Spark Types
- Types (<a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=types#module-pyspark.sql.types" target="_blank">Python</a>/<a href="http://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/types/index.html" target="_blank">Scala</a>): `ArrayType`, `DoubleType`, `IntegerType`, `LongType`, `StringType`, `StructType`, `StructField`

In [None]:
%run ./Includes/Classroom-Setup

### Read from CSV files
Read from CSV with DataFrameReader's `csv` method and the following options:

Tab separator, use first line as header, infer schema

In [None]:
# Mount the data from AWS S3
aws_bucket_name = "info323-lecture"
mount_name = "aws_lecture"

access_key = dbutils.secrets.get(scope = "aws", key = "access-key")
secret_key = dbutils.secrets.get(scope = "aws", key = "secret-key")
encoded_secret_key = secret_key.replace("/", "%2F")

dbutils.fs.unmount("/mnt/aws_lecture")
#dbutils.fs.mount("s3a://%s" % aws_bucket_name, "/mnt/%s" % mount_name)



dbutils.fs.mount("s3a://%s:%s@%s" % (access_key, encoded_secret_key, aws_bucket_name), "/mnt/%s" % mount_name)

In [None]:
display(dbutils.fs.ls("/mnt/%s" % mount_name))

path,name,size
dbfs:/mnt/aws_lecture/events-500k/,events-500k/,0
dbfs:/mnt/aws_lecture/events-500k-parquet/,events-500k-parquet/,0
dbfs:/mnt/aws_lecture/users-500k/,users-500k/,0
dbfs:/mnt/aws_lecture/users-500k-parquet/,users-500k-parquet/,0


In [None]:
usersCsvPath = "/mnt/training/ecommerce/users/users-500k.csv"

usersDF = (spark.read
  .option("sep", "\t")
  .option("header", True)
  .option("inferSchema", True)
  .csv(usersCsvPath))

usersDF.printSchema()

In [None]:
usersDF.show(2)

In [None]:
usersDF.count()

In [None]:
dbutils.fs.rm("/mnt/users-500k", True)

In [None]:
usersDF.write.option("header", True).option("delimiter", "\t").csv("/mnt/users-500k")

In [None]:
usersCsvPath_s3 = "s3://info323-lecture/users-500k/"

usersDF_aws = (spark.read
  .option("sep", "\t")
  .option("header", True)
  .option("inferSchema", True)
  .csv(usersCsvPath_s3))

usersDF_aws.printSchema()

Manually define the schema by creating a `StructType` with column names and data types

In [None]:
from pyspark.sql.types import LongType, StringType, StructType, StructField

userDefinedSchema = StructType([
  StructField("user_id", StringType(), True),  
  StructField("user_first_touch_timestamp", LongType(), True),
  StructField("email", StringType(), True)
])

Read from CSV using this user-defined schema instead of inferring schema

In [None]:
usersDF = (spark.read
  .option("sep", "\t")
  .option("header", True)
  .schema(userDefinedSchema)
  .csv(usersCsvPath_s3))

Alternatively, define the schema using a DDL formatted string.

In [None]:
DDLSchema = "user_id string, user_first_touch_timestamp long, email string"

usersDF = (spark.read
  .option("sep", "\t")
  .option("header", True)
  .schema(DDLSchema)
  .csv(usersCsvPath_s3))

### Read from JSON files

Read from JSON with DataFrameReader's `json` method and the infer schema option

In [None]:
eventsJsonPath = "/mnt/training/ecommerce/events/events-500k.json"

eventsDF = (spark.read
  .option("inferSchema", True)
  .json(eventsJsonPath))

eventsDF.printSchema()

In [None]:
eventsDF.show(10)

In [None]:
#eventsJsonPath_aws = "/mnt/aws_anyuanay0511/lecture-data/events-500k"
eventsJsonPath_s3 = "s3://info323-lecture/events-500k/"

eventsDF_aws = (spark.read
  .option("inferSchema", True)
  .json(eventsJsonPath_s3))

eventsDF_aws.printSchema()

In [None]:
eventsDF_aws.count()

In [None]:
dbutils.fs.rm("/mnt/events-500k", True)

In [None]:
eventsDF.write.json("/mnt/events-500k")

Read data faster by creating a `StructType` with the schema names and data types

In [None]:
from pyspark.sql.types import ArrayType, DoubleType, IntegerType, LongType, StringType, StructType, StructField

userDefinedSchema = StructType([
  StructField("device", StringType(), True),  
  StructField("ecommerce", StructType([
    StructField("purchaseRevenue", DoubleType(), True),
    StructField("total_item_quantity", LongType(), True),
    StructField("unique_items", LongType(), True)
  ]), True),
  StructField("event_name", StringType(), True),
  StructField("event_previous_timestamp", LongType(), True),
  StructField("event_timestamp", LongType(), True),
  StructField("geo", StructType([
    StructField("city", StringType(), True),
    StructField("state", StringType(), True)
  ]), True),
  StructField("items", ArrayType(
    StructType([
      StructField("coupon", StringType(), True),
      StructField("item_id", StringType(), True),
      StructField("item_name", StringType(), True),
      StructField("item_revenue_in_usd", DoubleType(), True),
      StructField("price_in_usd", DoubleType(), True),
      StructField("quantity", LongType(), True)
    ])
  ), True),
  StructField("traffic_source", StringType(), True),
  StructField("user_first_touch_timestamp", LongType(), True),
  StructField("user_id", StringType(), True)
])

eventsDF = (spark.read
  .schema(userDefinedSchema)
  .json(eventsJsonPath_s3))

You can use the `StructType` Scala method `toDDL` to have a DDL-formatted string created for you.

In Python, create a Scala cell to create the string to copy and paste.

In [None]:
%scala
spark.read.parquet("/mnt/training/ecommerce/events/events.parquet").schema.toDDL

In [None]:
DDLSchema = "`device` STRING,`ecommerce` STRUCT<`purchase_revenue_in_usd`: DOUBLE, `total_item_quantity`: BIGINT, `unique_items`: BIGINT>,`event_name` STRING,`event_previous_timestamp` BIGINT,`event_timestamp` BIGINT,`geo` STRUCT<`city`: STRING, `state`: STRING>,`items` ARRAY<STRUCT<`coupon`: STRING, `item_id`: STRING, `item_name`: STRING, `item_revenue_in_usd`: DOUBLE, `price_in_usd`: DOUBLE, `quantity`: BIGINT>>,`traffic_source` STRING,`user_first_touch_timestamp` BIGINT,`user_id` STRING"

eventsDF = (spark.read
  .schema(DDLSchema)
  .json(eventsJsonPath_s3))

In [None]:
eventsDF.show(3)

### Write DataFrames to files

Write `usersDF` to parquet with DataFrameWriter's `parquet` method and the following configurations:

Snappy compression, overwrite mode

In [None]:
dbutils.fs.rm("/mnt/users-500k-parquet", True)

In [None]:
#usersOutputPath = workingDir + "/users.parquet"
usersOutputPath = "/mnt/users-500k-parquet"

(usersDF.write
  .option("compression", "snappy")
  .mode("overwrite")
  .parquet(usersOutputPath)
)

In [None]:
#usersOutputPath = workingDir + "/users.parquet"
eventsOutputPath = "/mnt/aws_lecture/events-500k-parquet"

(eventsDF.write
  .option("compression", "snappy")
  .mode("overwrite")
  .parquet(eventsOutputPath)
)

-sandbox
### Write DataFrames to tables

Write `eventsDF` to a table using the DataFrameWriter method `saveAsTable`

This creates a global table, unlike the local view created by the DataFrame method `createOrReplaceTempView`

In [None]:
eventsDF.write.mode("overwrite").saveAsTable("events_p")

This table was saved in the database created for you in classroom setup. See database name printed below.

In [None]:
print(databaseName)

## ![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Ingesting Data Lab

Read in CSV files containing products data.
1. Read with infer schema
2. Read with user-defined schema
3. Read with DDL formatted string
4. Write to Delta

### 1. Read with infer schema
- View the first CSV file using DBUtils method `fs.head` with the filepath provided in the variable `singleProductCsvFilePath`
- Create `productsDF` by reading from CSV files located in the filepath provided in the variable `productsCsvPath`
  - Configure options to use first line as header and infer schema

In [None]:
# TODO
singleProductCsvFilePath = "/mnt/training/ecommerce/products/products.csv/part-00000-tid-1663954264736839188-daf30e86-5967-4173-b9ae-d1481d3506db-2367-1-c000.csv"

print(FILL_IN)

In [None]:
# TODO
productsCsvPath = "/mnt/training/ecommerce/products/products.csv"
productsDF = FILL_IN

productsDF.printSchema()

-sandbox
##### <img alt="Best Practice" title="Best Practice" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.3em" src="https://files.training.databricks.com/static/images/icon-blue-ribbon.svg"/> Check your work

In [None]:
assert(productsDF.count() == 12)

### 2. Read with user-defined schema
Define schema by creating a `StructType` with column names and data types

In [None]:
# TODO
userDefinedSchema = FILL_IN

productsDF2 = FILL_IN

-sandbox
##### <img alt="Best Practice" title="Best Practice" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.3em" src="https://files.training.databricks.com/static/images/icon-blue-ribbon.svg"/> Check your work

In [None]:
assert(userDefinedSchema.fieldNames() == ["item_id", "name", "price"])

In [None]:
from pyspark.sql import Row

expected1 = Row(item_id="M_STAN_Q", name="Standard Queen Mattress", price=1045.0)
result1 = productsDF2.first()

assert(expected1 == result1)

### 3. Read with DDL formatted string

In [None]:
# TODO
DDLSchema = FILL_IN

productsDF3 = FILL_IN

-sandbox
##### <img alt="Best Practice" title="Best Practice" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.3em" src="https://files.training.databricks.com/static/images/icon-blue-ribbon.svg"/> Check your work

In [None]:
assert(productsDF3.count() == 12)

### 4. Write to Delta
Write `productsDF` to the filepath provided in the variable `productsOutputPath`

In [None]:
# TODO
productsOutputPath = workingDir + "/delta/products"
productsDF.FILL_IN

-sandbox
##### <img alt="Best Practice" title="Best Practice" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.3em" src="https://files.training.databricks.com/static/images/icon-blue-ribbon.svg"/> Check your work

In [None]:
assert(len(dbutils.fs.ls(productsOutputPath)) == 5)

### Clean up classroom

In [None]:
%run ./Includes/Classroom-Cleanup
