In [143]:
#Imports:
import pandas as pd
import numpy as np
import altair as alt
import matplotlib.pyplot as plt
import time
import dask.dataframe as dd
import os


In [144]:
#Lets make our console outputs more nice, by applying some settings.
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
alt.renderers.enable('notebook')
alt.data_transformers.enable('default', max_rows=None)
%matplotlib inline 
pd.set_option('display.max_rows', 100)
pd.set_option('display.max_columns', 40)
pd.set_option('display.width', 1000)

RendererRegistry.enable('notebook')

DataTransformerRegistry.enable('default')

Our Dataset is about 3.6GB on disk, and "wc -l datafile.csv" tells us that we have 37 768 482 rows. Each row in the file takes up about 3 lines, so we have about 12589494 potential rows to access, in the dataset.

In [167]:
#our type dictionary

def daskcall(filePath):
    typeDict = {"Invoice/Item Number":"object"
    ,"Date":"object"
    ,"Store Number":"float64"
    ,"Store Name":"object"
    ,"Address":"object"
    ,"City":"object"
    ,"Zip Code":"object"
    ,"Store Location":"object"
    ,"County Number":"float64"
    ,"County":"object"
    ,"Category":"float64"
    ,"Category Name":"object"
    ,"Vendor Number":"int64"
    ,"Vendor Name":"object"
    ,"Item Number":"int64"
    ,"Item Description":"object"
    ,"Pack":"int64"
    ,"Bottle Volume (ml)":"float64"
    ,"State Bottle Cost":"object"
    ,"State Bottle Retail":"object"
    ,"Bottles Sold":"int64"
    ,"Sale (Dollars)":"object"
    ,"Volume Sold (Liters)":"float64"
    ,"Volume Sold (Gallons)":"float64"}

    #"./data/originalcsv/Iowa_Liquor_Sales.csv"
    #lets just read it in and see what happens.
    dfILS = dd.read_csv(filePath, engine="python", dtype=typeDict,
                     error_bad_lines=False,encoding="utf-8") #pandas args na_filter=True
    return dfILS

#dfILS.head(10)

In [168]:
dfILC = daskcall("./data/originalcsv/Iowa_Liquor_Sales.csv")
#forces dask to read in the file.
dfILS["Store Number"].describe().compute()

Skipping line 232331: unexpected end of data
Skipping line 232318: unexpected end of data


ValueError: Unable to convert column Store Number to type int64

In [171]:
#Does pandas on its own do the job?

typeDict = {"Invoice/Item Number":"object"
,"Date":"object"
,"Store Number":"float64"
,"Store Name":"object"
,"Address":"object"
,"City":"object"
,"Zip Code":"object"
,"Store Location":"object"
,"County Number":"float64"
,"County":"object"
,"Category":"float64"
,"Category Name":"object"
,"Vendor Number":"int64"
,"Vendor Name":"object"
,"Item Number":"int64"
,"Item Description":"object"
,"Pack":"int64"
,"Bottle Volume (ml)":"float64"
,"State Bottle Cost":"object"
,"State Bottle Retail":"object"
,"Bottles Sold":"int64"
,"Sale (Dollars)":"object"
,"Volume Sold (Liters)":"float64"
,"Volume Sold (Gallons)":"float64"}
    
dfILS = pd.read_csv("./data/originalcsv/Iowa_Liquor_Sales.csv", engine="python", dtype=typeDict,
                error_bad_lines=False,encoding="utf-8") 

dfILS.shape

KeyboardInterrupt: 

In [9]:
#function support:
#From: https://stackoverflow.com/questions/2081836/reading-specific-lines-only. Nice solution!
def yieldlines(thefile, whatlines):
  return (x for i, x in enumerate(thefile) if i in whatlines)

We see NaNs (that is fine). Lets try summarizing one column of the dataset (across all partitions).

Running: dfILS["Store Number"].describe().compute(), we get:

**"Skipping line 232331: unexpected end of data...Skipping line 232318"**

So around row 70k we have our first set of issues. I manually go through the file to see what is up.


In [54]:
ourFP = open("./data/originalcsv/Iowa_Liquor_Sales.csv")
linegen = yieldlines(ourFP,list(range(232210,232250)))

In [55]:
for line in linegen:
    print(line)


S12113900061,05/08/2013,2626,Hy-Vee Drugstore / University / DSM,4100 UNIVERSITY AVE,DES MOINES,50311,"4100 UNIVERSITY AVE

DES MOINES 50311

(41.600361, -93.673223)",77,Polk,1011200,STRAIGHT BOURBON WHISKIES,65,Jim Beam Brands,19068,Jim Beam,6,1750,$19.42,$29.14,6,$174.84,10.50,2.77

S27023400005,07/29/2015,3825,Shop N Save #2 / E 14th,1372 E 14TH ST,DES MOINES,50316,"1372 E 14TH ST

DES MOINES 50316

(41.604893, -93.600499)",77,Polk,1031080,VODKA 80 PROOF,300,Mccormick Distilling Company,36903,Mccormick Vodka,48,200,$1.13,$1.70,48,$81.60,9.60,2.54

S18556100019,04/23/2014,4912,Casey's General Store #2531 / Eldrid,"840, E LE CLAIRE RD",ELDRIDGE,52748,"840, E LE CLAIRE RD

ELDRIDGE 52748

(41.654692, -90.572547)",82,Scott,1031080,VODKA 80 PROOF,260,Diageo Americas,37994,Smirnoff Vodka 80 Prf,24,375,$4.75,$7.13,12,$85.56,4.50,1.19

S27807300002,09/09/2015,4463,Casey's General Store #3031 / Garner,145 US HWY 18 W,GARNER,50438,"145 US HWY 18 W

GARNER 50438

(43.105833, -93.603007)",41,Ha

Check: Are there premature EOFs before the true end of the file?

In [57]:
#So our EOF error is not correct. Loop would have terminated before it hit 37M lines.
def eoflinecheck():
    ourFP = open("./data/originalcsv/Iowa_Liquor_Sales.csv")
    i = 0
    for line in ourFP:
        i += 1
    ourFP.close()
    return i

eoflinecheck()

37768482

Hypothesis: our line parser goes out of wack due to control characters, or some bad formatting of lines. Each new record starts with an invoice number that starts with the letter S. Is every record on three lines? Lets find out

In [126]:
#For all of our lines, they are constrained within three lines. OK.
def threelinecheck(path):
    ourFP = open(path)
    result=True
    
    for i,line in enumerate(ourFP):
        if ((i > 0) and (i+1 % 3 == 0) and (line[0] != "S")):
            result=i
            break
    ourFP.close()
    return result

threelinecheck("./data/originalcsv/Iowa_Liquor_Sales.csv")


True

So, we have to manually clip out bad lines from the file. Are the lines that throw errors the actual lines causing the problem? Or is the parser out of phase, and drifting down the file until it finally crashes? I actually dont know! I looked at the offending lines and I didn't see any bad formatting. I am going to make a quarentine radius about the lines, and just cut out entires. We will lose more data this way, but looking for dirty control characters is an even worse way to spend my day.

In [123]:
#Lets design our clipping funciton.

#Signature: Path[String], Line Number[Integer], UpperRowRadius[Integer] --> Tuple(LB,UB)
#Purpose, for a given offending LINE, define a number of ROWS above it to clip, and then append the 
#rest of the file. This function will only perform the clip operation for ONE offending row.
#Use dask to iteratively go through the file, and find all of the anomolies. Dask will eventually crash on line numbers, 
#so lets just dump them.

#Note: lineNum arg is in lines, upRowRad is interpreted to be rows (lines/3). Not the same basis.
def clipappend(pathTo,fileName,lineNum,upRowRad):
    #if (not os.path.exists(pathto + "/temphold.csv")):
    ourFP = open(pathTo + "/" + fileName)
    tempFP = open(pathTo + "/temp.csv","w+") #will just create the file for us

    #endpoint set: 3,6,9,12 ... 3k
    #startpoint set: 1,4,7,10... 3k+1 I choose to calculate from ENDPOINT basis.
    
    shiftUp = 0
    #is our line Number an endLine for a given row?
    modShift = lineNum % 3 

    # If we are at first or middle position of a row.
    if (modShift == 1): #at start position
        shiftUp = 2 #to endpoint
    elif (modShift == 2): #middle position of a row
        shiftUp = 1 #to endpoint
    lowerBound = lineNum+shiftUp-((upRowRad+1)*3) #also remove the row itself from endpoint - thats why +1
    upperBound = lineNum+shiftUp 
    excludeRange = list(range(lowerBound+1,upperBound+1))
    #at this point, we have the cutoffs in the endpoint basis.
    for i,line in enumerate(ourFP):
        if (i in excludeRange):
            pass
        else:
            tempFP.write(line)
    ourFP.close()
    tempFP.close()
    #now overwrite our tempfile onto our original file. 
    os.remove(pathTo + "/" + fileName)
    os.rename(pathTo + "/temp.csv",pathTo + "/" + fileName)
    
    return (lowerBound,upperBound)

hold = clipappend("./data/originalcsv","splitfile.csv", 15, 1)


Now we have to reform datafile, using dask to tell us where the erroneous lines are! This takes a number of iterations by hand. For a set of numbers that are thrown (before crash) we calculate a range between the extremes, and extend it more to be safe.

Our start file is called "A.csv", our temporary file is called "temp.csv". We run dask on A until it crashes on a range of rows. We then call clipappend() on a, and give it the row number and epsilon rollback. When the function returns, A should be replaced by temp.csv, and temp.csv should be deleted. We keep moving forward until the entire file is cleaned.

## Basic Tests:

In [124]:
#First test it out on the splitfile: "head -n 30001 ./Iowa_Liquor_Sales.csv > splitfile.csv"
"
clipappend("./data/originalcsv","splitfile.csv", 20000, 2000)

(13998, 20001)

In [127]:
#Now for a real test: do we have a properly formatted file? Does
#every third line have an invoice number? or did it get garbled?
threelinecheck("./data/originalcsv/splitfile.csv")

True

In [128]:
#Final Test: Lets make a splitfile: "head -n 300 ./Iowa_Liquor_Sales.csv > splitfile.csv"
#and manually check in a text editor that the rows are formatted correctly.
clipappend("./data/originalcsv","splitfile.csv", 220, 10)

(189, 222)

In [129]:
#So the file looked OK in a text editor. What about reading with pandas?
testDF = pd.read_csv("./data/originalcsv/splitfile.csv", engine="python", dtype=typeDict,
                 encoding="utf-8")


In [147]:
#testDF #Looks good!

### Setting up the feed forward cleaning Method:

In [150]:
#Signature: startLine[Int], cutRows[Int], TupleDict[List], {Input to clipappend} -> NoneType
#Purpose: pass in the last line number, and a range of lines to cut out. Also pass in a list to modify.
#Call clip append. The output of clipappend is appended to the list, and mutation changes are maintained after the 
#function ends.
def forwardclean(startLine, cutRows, tupleList, pathTo, fileName):
    tupleList.append(clipappend(pathTo,fileName, startLine, cutRows))
    return 

tupleList = []
#"./data/originalcsv/A.csv"
#this will throw errors and eventually crash. That is fine.
def daskerrorlines(filePath):
    dfILS = daskcall(filePath)
    #forces dask to read in the file.
    dfILS["Store Number"].describe().compute()    


In [138]:
#Quick Test of the method:
# head -n 300 Iowa_Liquor_Sales.csv > splitfile.csv
forwardclean(100,10,tupleList,"./data/originalcsv","splitfile.csv")
forwardclean(100,10,tupleList,"./data/originalcsv","splitfile.csv")
forwardclean(100,10,tupleList,"./data/originalcsv","splitfile.csv")
tupleList

[(69, 102)]

#### This cell is called repeatedly, and iterations of cleanup are called below (by hand).

In [165]:
daskerrorlines("./data/originalcsv/A.csv")

Skipping line 232319: unexpected end of data
Skipping line 232363: unexpected end of data


ValueError: Unable to convert column Store Number to type int64

#### Cleanup Iterations are called below.

In [154]:
tupleList = []
#why 10 lines? The error we see at the lines, might not have started at the given line. There are so many different
#columns of different types, I assume the error can be 1000 lines up (type mismatch would occur). Ten felt about
#right, basically.
forwardclean(232331,10,tupleList,"./data/originalcsv","A.csv") #last line we see...and cut out 10 rows to start.

In [157]:
forwardclean(232331,10,tupleList,"./data/originalcsv","A.csv") 

In [159]:
forwardclean(232331,20,tupleList,"./data/originalcsv","A.csv") 

In [161]:
forwardclean(232331,50,tupleList,"./data/originalcsv","A.csv") 

In [162]:
forwardclean(232400,50,tupleList,"./data/originalcsv","A.csv") 

In [164]:
forwardclean(232500,60,tupleList,"./data/originalcsv","A.csv") 

### Time to stop.

This is a huge waste of time. I am going round in circles. On Kaggle I can load the entire dataset with pandas. It is clear that I should learn more about the dask framework before I start pulling ad hoc ideas and methods from it. END.