## Data Transformation Task
<br><br>
The windfarm ANH01 has turbines (`wtgid`) ANH01A01, ANH01A02, ...,  ANH01C03.
<br><br>
Every ten minutes a csv file is generated with columns: `id`, `timestamp`, `wtgid`, `siteid`, `tditag`, `value`, `insert_time`
See f.x. "ANH01_2020-02-20-13-10-00.csv"
<br><br>
Different sensors (*tditag*: Tag1, ..., Tag6) produce values of different types (string, integer, datetime, float).
<br><br>
The data scientists in the operations department require easy access to the time series produced by the sensors `Tag3`, `Tag4` and `Tag5`.
<br><br>
You thus have to 
1. filter  
2. transform/transpose the data to have columns: `timestamp`, `siteid`, `wtgid`, `Tag3`, `Tag4`, `Tag5`

where the tag columns contain the corresponding value with an appropriate datatype.
<br><br>
Use pyspark, pandas or pure python.
There are some classes that you can use to read the files into a pyspark or pandas dataframe.
<br><br>
Your task is to write the transform method in `TransformJob`.

-----


In [1]:
from abc import ABC, abstractmethod
from collections import namedtuple
from typing import List
import pandas as pd
from pandas import DataFrame as PandasDataFrame
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame as PysparkDataFrame


class InOutBase(ABC):
    """Abstract base class for read and write functionality."""

    @abstractmethod
    def read(self, path: str):
        pass

    @abstractmethod
    def write(self, data, path: str):
        pass


class PandasIO(InOutBase):
    """Reads and write csv files from/to pandas Dataframes."""

    def read(self, path) -> PandasDataFrame:
        return pd.read_csv(path)

    def write(self, data: PandasDataFrame, path: str):
        data.to_csv(path, index=False)


class SparkIO(InOutBase):
    """Reads and write csv files from/to pyspark Dataframes."""

    def __init__(self):
        self.spark = SparkSession.builder.getOrCreate()
        super().__init__()

    def read(self, path) -> PysparkDataFrame:
        return self.spark.read.format("csv").option("header", "true").load(path)

    def write(self, data: PysparkDataFrame, path: str):
        data.coalesce(1).write.mode("overwrite").format("csv").save(path, header="true")


class NamedTupleIO(InOutBase):
    """Reads and write csv files from/to named tuples, 
       where the header in the csv files corresponds to the field names."""

    def read(self, path) -> List[namedtuple]:
        with open(path, "r") as f:
            data = []
            for idx, l in enumerate(f):
                if idx == 0:
                    named_tuple = namedtuple("row", l)
                else:
                    l = l.rstrip()
                    if len(named_tuple._fields) == len(l.split(",")):
                        data.append(named_tuple(*l.split(",")))
        return data

    def write(self, path, data: List[namedtuple]):
        with open(path, "w") as f:
            f.write(",".join(data[0]._fields) + "\n")
            for row in data:
                f.write(",".join(row) + "\n")

In [23]:
class TransformJob:
    def __init__(self, file_handler: InOutBase):
        self._file_handler = file_handler

    def read(self, path):
        return self._file_handler.read(path)

    def write(self, path, data):
        self._file_handler.write(data, path)

    @staticmethod
    def transform(raw_data):
        """Finish me!"""
        transformed_data = raw_data
        return transformed_data

    def execute(self, input_path, output_path):
        input_data = self.read(input_path)
        output_data = self.transform(input_data)
        if not output_path:
            return output_data
        self.write(output_path, output_data)

In [24]:
input_path = 'ANH01_2020-02-20-13-40-00.csv'

In [32]:
df = TransformJob(InOutBase()).execute(input_path=input_path, output_path=None)

TypeError: Can't instantiate abstract class InOutBase with abstract methods read, write