# Pipeline
This notebook defines a sample data processing pipeline as a set of classes. To execute the pipeline, use a **driver** notebook and to test the pipeline use a **tests** notebook. This promotes good coding practices around code structure and code coverage while still working within a Notebook environment.

In [2]:
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import expr, col
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

## File Access
This is an abstract class that forms the interface to read/write DataFrames used by the pipeline. This allows control of the input/output file systems and files for unit testing and allows tests to execute on totally separate data in all environments.

In [4]:
class FileAccess:
  def __init__(self, spark: SparkSession, dbutils) -> None:
    self.spark = spark
    self.dbutils = dbutils
  def read(self, path: str) -> DataFrame:
    pass
  def write(self, path: str, df: DataFrame) -> None:
    pass

## DBFS File Access
This class is a concrete implementation of the FileAccess class and is provided for the production pipeline. Data is read from and written to DBFS.

In [6]:
class DbfsFileAccess(FileAccess):
  def read(self, path: str) -> DataFrame:
    schema=StructType([
      StructField('id', IntegerType(), False), 
      StructField('name', StringType(), False), 
      StructField('value', StringType(), False)
    ])
    return spark.read.format('csv').load(f'dbfs:/tmp/pipeline/{path}', schema=schema, header=True)
  def write(self, path: str, df: DataFrame):
    df.write.format('delta').mode('overwrite').save(f'dbfs:/tmp/pipeline/{path}')

## Data Pipeline
The following class implements the data pipeline

In [8]:
class Pipeline:
  def __init__(self, spark: SparkSession, files: FileAccess) -> None:
    self.spark = spark
    self.files = files  
  
  def _transform(self, df: DataFrame) -> DataFrame:
    df = df.select(col('id'), expr('UPPER(name) AS name'), col('value'), expr('CAST(1 AS TINYINT) AS processed'), expr('CURRENT_TIMESTAMP() AS processed_time'))
    return df
  
  def run(self) -> None:
    df = self.files.read('raw.csv')
    df = self._transform(df)
    self.files.write('refined', df)