## Quick pipeline

Before you parse some more complex data, your manager would like to see a simple pipeline example including the basic steps. For this example, you'll want to ingest a data file, filter a few rows, add an ID column to it, then write it out as JSON data.

The spark context is defined, along with the pyspark.sql.functions library being aliased as F as is customary.

### Instructions

- Import the file 2015-departures.csv.gz to a DataFrame. Note the header is already defined.
- Filter the DataFrame to contain only flights with a duration over 0 minutes. Use the index of the column, not the column name (remember to use .printSchema() to see the column names / order).
- Add an ID column.
- Write the file out as a JSON document named output.json.

In [1]:
from pyspark.sql import SparkSession

In [2]:
# May take a little while on a local computer
spark = SparkSession.builder.appName("Basics").getOrCreate()

In [5]:
# Import the data to a DataFrame
departures_df = spark.read.csv('AA_DFW_2017_Departures_Short.csv.gz', header=True)

In [6]:
# Remove any duration of 0
departures_df = departures_df.filter(departures_df[3] > 0)

In [4]:
# Add an ID column
departures_df = departures_df.withColumn('id', F.monotonically_increasing_id())

# Write the file out to JSON format
departures_df.write.json('output.json', mode='overwrite')

NameError: name 'F' is not defined

## Removing commented lines

Your boss would like you to perform some complex parsing on a new dataset. The data represents annotation data for the ImageNet dataset, but focusing specifically on dog breeds and identifying them in images. Before any actual analysis can occur, you'll need to clear out several components of invalid / incorrect data. The general schema of the document is unknown so you'd like to import the rows into a single column, allowing for quick analysis.

To start, you need to remove all commented rows in the dataset.

The spark context, and the base CSV file (annotations.csv.gz) are available for you to work with. The col function is also available for use.

### Instructions

- Import the annotations.csv.gz file to a DataFrame and perform a row count. Specify a separator character of |.
- Query the data for the number of rows beginning with #.
- Import the file again to a new DataFrame, but specify the comment character in the options to remove any commented rows.
- Count the new DataFrame and verify the difference is as expected.

In [7]:
# Import the file to a DataFrame and perform a row count
annotations_df = spark.read.csv('annotations.csv.gz', sep='|')
full_count = annotations_df.count()

# Count the number of rows beginning with '#'
comment_count = annotations_df.where(col('_c0').startswith('#')).count()

# Import the file to a new DataFrame, without commented rows
no_comments_df = spark.read.csv('annotations.csv.gz', sep='|', comment='#')

# Count the new DataFrame and verify the difference is as expected
no_comments_count = no_comments_df.count()
print("Full count: %d\nComment count: %d\nRemaining count: %d" % (full_count, comment_count, no_comments_count))

AnalysisException: 'Path does not exist: file:/home/iahmad/work/vodaphone/SPARK--SQL/labs/annotations.csv.gz;'

## Removing invalid rows

Now that you've successfully removed the commented rows, you have received some information about the general format of the data. There should be at minimum 5 tab separated columns in the DataFrame. Remember that your original DataFrame only has a single column, so you'll need to split the data on the tab (\t) characters.

The DataFrame annotations_df is already available, with the commented rows removed. The spark.sql.functions library is available under the alias F. The initial number of rows available in the DataFrame is stored in the variable initial_count.

### Instructions

- Create a new variable tmp_fields using the annotations_df DataFrame column '_c0' splitting it on the tab character.
- Create a new column in annotations_df named 'colcount' representing the number of fields defined in the previous step.
- Filter out any rows from annotations_df containing fewer than 5 fields.
- Count the number of rows in the DataFrame and compare to the initial_count.

In [None]:
# Split _c0 on the tab character and store the list in a variable
tmp_fields = F.split(annotations_df['_c0'], '\t')

# Create the colcount column on the DataFrame
annotations_df = annotations_df.withColumn('colcount', F.size(tmp_fields))

# Remove any rows containing fewer than 5 fields
annotations_df_filtered = annotations_df.filter(~ (annotations_df["colcount"] < 5))

# Count the number of rows
final_count = annotations_df_filtered.count()
print("Initial count: %d\nFinal count: %d" % (initial_count, final_count))

## Splitting into columns

You've cleaned up your data considerably by removing the invalid rows from the DataFrame. Now you want to perform some further transformations by generating specific meaningful columns based on the DataFrame content.

You have the spark context and the latest version of the annotations_df DataFrame. pyspark.sql.functions is available under the alias F.

### Instructions
- Split the content of the '_c0' column on the tab character and store in a variable called split_cols.
- Add the following columns based on the first four entries in the variable above: folder, filename, width, height on a DataFrame named split_df.
- Add the split_cols variable as a column.

In [1]:
# Split the content of _c0 on the tab character (aka, '\t')
split_cols = F.split(annotations_df["_c0"], '\t')

# Add the columns folder, filename, width, and height
split_df = annotations_df.withColumn('folder', split_cols.getItem(0))
split_df = split_df.withColumn('filename', split_cols.getItem(1))
split_df = split_df.withColumn('width', split_cols.getItem(2))
split_df = split_df.withColumn('height', split_cols.getItem(3))

# Add split_cols as a column
split_df = split_df.withColumn('split_cols', split_cols)

NameError: name 'F' is not defined

## Further parsing

You've molded this dataset into a significantly different format than it was before, but there are still a few things left to do. You need to prep the column data for use in later analysis and remove a few intermediary columns.

The spark context is available and pyspark.sql.functions is aliased as F. The types from pyspark.sql.types are already imported. The split_df DataFrame is as you last left it. Remember, you can use .printSchema() on a DataFrame in the console area to view the column names and types.

### Instructions

- Create a new function called retriever that takes two arguments, the split columns (cols) and the total number of columns (colcount). This function should return a list of the entries that have not been defined as columns yet (i.e., everything after item 4 in the list).
- Define the function as a Spark UDF, returning an Array of strings.
- Create the new column dog_list using the UDF and the available columns in the DataFrame.
- Remove the columns _c0, colcount, and split_cols.

In [None]:
def retriever(cols, colcount):
  # Return a list of dog data
  return cols[4:colcount]

# Define the method as a UDF
udfRetriever = F.udf(retriever, ArrayType(StringType()))

# Create a new column using your UDF
split_df = split_df.withColumn('dog_list', udfRetriever(split_df.split_cols, split_df.colcount))

# Remove the original column, split_cols, and the colcount
split_df = split_df.drop('_c0').drop('split_cols').drop('colcount')