In [18]:
import sys
sys.path.append('..')
from pep_con_tool.data_processing.arrow_funtions import read_csv_file, read_csv_headers, pa_to_df
from pep_con_tool.data_processing.pd_functions import get_columns_containing_string, date_column_parser,fix_date_columns, set_col_cat
import janitor
from janitor import clean_names
import pandas as pd
import pyarrow as pa
from datetime import datetime

In [19]:
for col in df.columns:
    un = len(df[col].unique())
    if un < unique_n:
        #set dtype to category
        df[col] = df[col].astype('category')


In [20]:
df = pd.DataFrame({"col_1": ["a", "b", "b"], "col_2": ["a", "b", "c"]})
df = set_col_cat(df, 2)



In [22]:
df.dtypes

col_1    object
col_2    object
dtype: object

In [None]:
assert df["col_1"].dtype == "category"
assert df["col_1"][0] == "a"
assert df["col_1"][1] == "b"
assert df["col_1"][2] == "c"
assert df["col_2"].dtype != "category"
assert df["col_2"][0] == "a"
assert df["col_2"][1] == "b"
assert df["col_2"][2] == "c"


In [13]:
"""test for fix_date_columns"""
df = pd.DataFrame({"col_1": ["2020-01-01", "2020-01-02", "2020-01-03"], "col_2": ["2020-01-01", "2020-01-02", "2020-01-03"]})
df = fix_date_columns(df, "col", "%Y-%m-%d")


[]


In [16]:
get_columns_containing_string(df, "col")


['col_1', 'col_2']

In [11]:
df["col_1"][0]

'2020-01-01'

In [6]:
assert df["col_1"].dtype == "datetime64[ns]"
assert df["col_1"][0] == pd.Timestamp("2020-01-01")
assert df["col_1"][1] == pd.Timestamp("2020-01-02")
assert df["col_1"][2] == pd.Timestamp("2020-01-03")


AssertionError: 

## Read Data

### step 1: get headers, get data


In [2]:
files = "../data/csv_input/2020_01.csv"
filen = "../tests/test_files/test_csv_1.csv"

Filename = files
delimiter = ";"

In [3]:
# read csv file to pyarrow table, then convert to pandas df
def csv2df(Filename, delimiter=";"):
    """read csv file to pyarrow table, add filename to column"""
    headers = read_csv_headers(Filename, delimiter)
    pa_table = read_csv_file(Filename, headers, 1, delimiter, True)
    df = pa_to_df(pa_table)
    return df
    

In [4]:
# pandas show all columns in jupyter notebook

pd.options.display.max_columns = None




In [5]:

def get_columns_containing_string(df, string):
    """ get date columns from name """
    return [col for col in df.columns if string in col.lower()]

# parse date column function
def date_column_parser(df, date_column, date_format):
    """
    parse date column function
    df: pandas dataframe
    date_column: date column name
    date_format: date format
    """
    df[date_column] = pd.to_datetime(df[date_column], format=date_format, errors='coerce')
    return df  

def fix_date_columns(df, date_format, column_string):
    """ 
    fix date columns 
    df: pandas dataframe
    date_format: date format example: '%Y-%m-%d'
    column_string: string to search for in column names
    """

    date_columns = get_columns_containing_string(df, column_string)

    for col in date_columns:
        df = date_column_parser(df, col, date_format)
    
    return df

# function to add date and time columns
def add_date_time_columns(df, dt_col : list, remove_original : bool = True):
    """ 
    function to add date and time columns 
    df: pandas dataframe
    dt_col: list of tuples [(date_col, time_col), (date_col, time_col)]
    """

    for (date_column, time_column) in dt_col:
        df[(f"{date_column}_time")] = df[date_column] + (df[time_column] - datetime(1900, 1, 1))
    
    if remove_original:
        # remove dt_col columns
        for (date_column, time_column) in dt_col:
            df.drop([date_column, time_column], axis=1, inplace=True)
            
        
    return df


def set_col_cat(df, unique_n=10):
    """
    Looping through all the columns in the dataframe and checking the number of unique values in each
    column. 
    If the number of unique values is less than the unique_n argument
    it sets the column data type to category.
    """    
    
    for col in df.columns:
        un = len(df[col].unique())
        if un < unique_n:
            #set dtype to category
            df[col] = df[col].astype('category')

    return df
        

In [32]:
# read csv file to pandas df
df = csv2df(files)

# clean up data frame

#clean column names:
df = df.clean_names()

# formatting date
date_format = '%d.%m.%Y'
column_string = 'date'

df = fix_date_columns(df, date_format, column_string)

# formatting time columns
# set date_format function to format the time column.
date_format = '%H:%M:%S'
column_string = 'time'

df = fix_date_columns(df, date_format, column_string)

# combine date and time columns
dt_col = [('task_beginning_date', 'task_beginning_time'), ('task_ending_date',	'task_ending_time')]
df = add_date_time_columns(df, dt_col)

df = set_col_cat(df)



In [36]:
def data_pipe_pba(csv_file):
    """data cleaning pipeline for reading PBA csv files"""

    # formatting date
    date_format = "%d.%m.%Y"
    date_string = "date"

    # formatting time columns
    # set date_format function to format the time column.
    time_format = "%H:%M:%S"
    time_string = "time"

    # combine date and time columns
    dt_col = [
        ("task_beginning_date", "task_beginning_time"),
        ("task_ending_date", "task_ending_time"),
    ]

    df = (
        csv2df(csv_file)
        .pipe(clean_names)
        .pipe(fix_date_columns, date_format, date_string)
        .pipe(fix_date_columns, time_format, time_string)
        .pipe(add_date_time_columns, dt_col)
        .pipe(set_col_cat)
    )

    return df


In [None]:
# test for read_csv_file
def test_read_csv_file():
    """test for the read_csv_file function"""
    # read_csv_file("test_data/test_data.csv")
    headers = read_csv_headers(CSV_TEST_C_FILE_1, ",")
    assert read_csv_file(CSV_TEST_C_FILE_1, headers, 1, ",") == [
        {"a": "1", "b": "2", "c": "3"},
        {"a": "4", "b": "5", "c": "6"},
    ]
    

In [38]:
CSV_TEST_C_FILE_1 = "../tests/test_files/test_csv_1.csv"
headers = read_csv_headers(CSV_TEST_C_FILE_1, ",")

In [64]:
pyarrow_table = read_csv_file(CSV_TEST_C_FILE_1, headers, 1, ",")
assert type(pyarrow_table) == pa.Table
# pa table headers include filename
assert headers == list(pyarrow_table.column_names[:-1])

firstrow = {'header_0': 'row_0_col_0',
 'header_1': 'row_0_col_1',
 'header_2': 'row_0_col_2',
 'header_3': 'row_0_col_3',
 'header_4': 'row_0_col_4',
 'header_5': 'row_0_col_5',
 'header_6': 'row_0_col_6',
 'header_7': 'row_0_col_7',
 'header_8': 'row_0_col_8',
 'header_9': 'row_0_col_9',
 'header_10': 'row_0_col_10',
 'header_11': 'row_0_col_11',
 'header_12': 'row_0_col_12',
 'header_13': 'row_0_col_13',
 'header_14': 'row_0_col_14',
 'header_15': 'row_0_col_15',
 'filename': 'test_csv_1'}

assert pyarrow_table.to_pylist()[0] == firstrow

firstcol = ['row_0_col_0',
 'row_1_col_0',
 'row_2_col_0',
 'row_3_col_0',
 'row_4_col_0',
 'row_5_col_0',
 'row_6_col_0',
 'row_7_col_0',
 'row_8_col_0',
 'row_9_col_0',
 'row_10_col_0',
 'row_11_col_0',
 'row_12_col_0']

assert pyarrow_table.to_pydict()[headers[0]] == firstcol


In [63]:
# get first row of pa table


['row_0_col_0',
 'row_1_col_0',
 'row_2_col_0',
 'row_3_col_0',
 'row_4_col_0',
 'row_5_col_0',
 'row_6_col_0',
 'row_7_col_0',
 'row_8_col_0',
 'row_9_col_0',
 'row_10_col_0',
 'row_11_col_0',
 'row_12_col_0']

['a', 'b', 'b_1', 'c', 'c_1', 'c_1']