# Large file processor

Skip to Usage section to see the worked session.

Note: If you see Database is locked error, it is because of the lightweight nature of SQLite which shouldn't be a problem when using a other DBs.

## Setup

In [1]:
!pip list | grep 'pandas\|sqllite3\|threading'

pandas                        1.1.5         
pandas-datareader             0.9.0         
pandas-gbq                    0.13.3        
pandas-profiling              1.4.1         
sklearn-pandas                1.8.0         


In [2]:
# Data in Google Drive
!ls 'drive/MyDrive/Postman'

'Large file processing - Assignment.docx'   products.csv.gz


In [3]:
import pandas as pd
import sqlite3
import threading

class DataProcessor:
    def __init__(self, db_name='defaultDB.sqlite3'):
        """
        Creates a new db or connects to an existing one in working directory.
        
        INPUT: 
        db_name: String for a db name
        
        RETURN: None
        """
        self.db_name = db_name

        with sqlite3.connect(self.db_name) as db:
            cur = db.cursor()
            
            cur.execute("CREATE TABLE IF NOT EXISTS products (name, sku, description)")
            cur.execute("CREATE INDEX IF NOT EXISTS sku_idx ON products(sku)")
            
            cur.execute("SELECT COUNT(1) FROM products")
            print('Connected to {name} with {records} records'.format(name=self.db_name, records=cur.fetchone()[0]))
    
    def get_record_count(self):
        """
        Prints the rowcount of products db.
        
        INPUT: None
        RETURN: None
        """
        with sqlite3.connect(self.db_name) as db:
            cur = db.cursor()
            cur.execute("SELECT COUNT(1) FROM products")
            print('Products table has {records} records'.format(records=cur.fetchone()[0]))
    
    def load_data(self, path_to_file, chunksize=100000, block=False):
        """
        Non-blcoking parallel code to read a csv file and insert into a db.
        Works with compressed CSV as well.
        
        INPUT: 
        path_to_file: String path for a csv file.
        chunksize: Integer for the number records per batch.
        block: Bool to control how the code runs.
        
        RETURN: None
        """
        def write_to_db(df):
            with sqlite3.connect(self.db_name, timeout=40) as db:
                df.to_sql("products", db, if_exists="append", index=False)
        
        threads = []
        
        # read csv file in chunks and assign to threads for db write
        for df in pd.read_csv(path_to_file, chunksize=chunksize):
            thread = threading.Thread(target=write_to_db, args=[df])
            thread.start()
            threads.append(thread)

        # TO-DO: read about join
        if block:
            for thread in threads:
                thread.join()
            print('All tasks are done.')
        else:
            print('Tasks have been scheduled.')
    
    def update_products_by_sku(self, sku_key, record, all_occurances=False):
        """
        Update records based sku value. If there is key conflict, user can chose how to resolve.
        
        INPUT: 
        sku_key: String key for a sku column.
        record: A tuple with name, sku and description values.
        all_occurances: Bool to control how to resolve key conflict. If True, all records with given sku will be overwritten.
        
        RETURN: None
        """
        assert type(record) == tuple, 'records should be a tuple'
        
        query = """
                UPDATE products
                SET name=(?), sku=(?), description=(?)
                WHERE sku = (?)
                """
        with sqlite3.connect(self.db_name) as db:
            num_records = db.execute("SELECT COUNT(1) FROM products WHERE sku = (?)", [sku_key]).fetchone()[0]
            
            if num_records==1 or all_occurances==True:
                db.execute(query, (record[0], record[1], record[2], sku_key))
                print('Total changes:', db.total_changes)
            else:
                print('Key Conflict: This sku has {0} records. Set all_occurances=True or use update_products_by_rowid method'.format(num_records))

    def get_records_by_sku(self, sku_list):
        """
        Return a np.array with matching sku values from the given list.
        
        INPUT: 
        sku_list: list of strings
        
        RETURN: np.array with product records
        """
        assert type(sku_list) == list, 'Input should be a list'
        
        sku_values = str(tuple([key for key in sku_list])).replace(',)', ')')
        query = "SELECT rowid, * FROM products WHERE sku IN {0}".format(sku_values)
        # print(query)
        
        with sqlite3.connect(self.db_name) as db:
            # return db.execute(query).fetchall()
            return pd.read_sql(query, db).values
    
    def update_products_by_rowid(self, records):
        """
        Update records based on rowid.
        
        INPUT: 
        sku_list: list of collection of records (rowid, name, sku, description)
        
        RETURN: Prints the total changes to records.
        """
        # assert type(records) == list and type(records[0]) == tuple, 'records should be a list of tuples'

        query = """
                UPDATE products
                SET name=(?), sku=(?), description=(?)
                WHERE rowid=(?)
                """
        def row_generator():
            for row in records:
                yield (row[1], row[2], row[3], row[0])
        
        with sqlite3.connect(self.db_name) as db:
            db.executemany(query, row_generator())
            print('Total changes:', db.total_changes)
    
    def get_aggregate_table(self, LIMIT=-1):
        """
        Returns a table with unique names and row count
        
        INPUT: 
        LIMIT: Integer to get the desired number of records. Defaultis to return the whole set.
        
        RETURN: Dataframe with two columns
        """
        query = """
                SELECT name, COUNT(1) as 'no. of products'
                FROM products
                GROUP BY name
                LIMIT {0}""".format(LIMIT)
        with sqlite3.connect(self.db_name) as db:
            return pd.read_sql(query, db)

## Usage

### Init

In [6]:
# Initiate and conect with the db
app = DataProcessor('test.db')

Connected to test.db with 0 records


In [7]:
app.get_record_count()

Products table has 0 records


### Consume the File

In [8]:
# load data
app.load_data('drive/MyDrive/Postman/products.csv.gz')

Tasks have been scheduled.


In [9]:
# since code is non-blcking, we see onlt 200000 for now
app.get_record_count()

Products table has 200000 records


In [10]:
# load data again with block=True, wait till all records are done.
app.load_data('drive/MyDrive/Postman/products.csv.gz', block=True)

All tasks are done.


In [11]:
# ingested file two times so all 2 * 500000 = 10000000 records
app.get_record_count()

Products table has 1000000 records


In [12]:
# Get records with sku, pandas DataFrame used for diplay convinience. 
pd.DataFrame(app.get_records_by_sku(['step-onto', 'professional']))

Unnamed: 0,0,1,2,3
0,75,Bradley Hernandez,professional,General Republican white customer author. Dog ...
1,2765,Jeffrey Weeks,professional,Wall toward compare serious. Foreign one diffi...
2,4974,David Decker,professional,Talk full environment short your. Site campaig...
3,5246,Jacqueline Raymond,professional,Nice much each partner leg property. Language ...
4,8198,Anthony Washington,professional,Serious why customer people. Four find purpose...
...,...,...,...,...
347,982150,Dennis Taylor,professional,Very assume rich live. Or cost couple nature.\...
348,984217,Debra Jones,professional,Start bag wind pull strong. But between along ...
349,986071,Shelley Clark,professional,Note the fast. Enough owner part student beaut...
350,3,Theresa Taylor,step-onto,Choice should lead budget task. Author best me...


### Update with sku

In [13]:
# because we ingested the file twice, there exists two records for this sku
pd.DataFrame(app.get_records_by_sku(['step-onto']))

Unnamed: 0,0,1,2,3
0,3,Theresa Taylor,step-onto,Choice should lead budget task. Author best me...
1,500003,Theresa Taylor,step-onto,Choice should lead budget task. Author best me...


In [14]:
# Because there are two records for a key, we can chose to overwrite both records or update based on rowid
app.update_products_by_sku('step-onto', ('Theresa Taylor 2', 'step-onto', 'Choice should lead budget task.'))

Key Conflict: This sku has 2 records. Set all_occurances=True or use update_products_by_rowid method


In [15]:
# set all_occurances=True to update all
app.update_products_by_sku('step-onto', ('Theresa Taylor 2', 'step-onto', 'Choice should lead budget task.'), all_occurances=True)

Total changes: 2


In [16]:
pd.DataFrame(app.get_records_by_sku(['step-onto']))

Unnamed: 0,0,1,2,3
0,3,Theresa Taylor 2,step-onto,Choice should lead budget task.
1,500003,Theresa Taylor 2,step-onto,Choice should lead budget task.


### Update with rowid

In [17]:
# there are a lot of file for just two sku
pd.DataFrame(app.get_records_by_sku(['step-onto', 'professional']))

Unnamed: 0,0,1,2,3
0,75,Bradley Hernandez,professional,General Republican white customer author. Dog ...
1,2765,Jeffrey Weeks,professional,Wall toward compare serious. Foreign one diffi...
2,4974,David Decker,professional,Talk full environment short your. Site campaig...
3,5246,Jacqueline Raymond,professional,Nice much each partner leg property. Language ...
4,8198,Anthony Washington,professional,Serious why customer people. Four find purpose...
...,...,...,...,...
347,982150,Dennis Taylor,professional,Very assume rich live. Or cost couple nature.\...
348,984217,Debra Jones,professional,Start bag wind pull strong. But between along ...
349,986071,Shelley Clark,professional,Note the fast. Enough owner part student beaut...
350,3,Theresa Taylor 2,step-onto,Choice should lead budget task.


In [18]:
# change the exported records in result set, here we change just one
foo = app.get_records_by_sku(['step-onto', 'professional'])
foo[0][1] = 'Bradley Test'

In [19]:
# Update based on rowid
app.update_products_by_rowid([foo[0]])

Total changes: 1


In [20]:
## Name has been changed to 'Bradley Test'
pd.DataFrame(app.get_records_by_sku(['step-onto', 'professional']))

Unnamed: 0,0,1,2,3
0,75,Bradley Test,professional,General Republican white customer author. Dog ...
1,2765,Jeffrey Weeks,professional,Wall toward compare serious. Foreign one diffi...
2,4974,David Decker,professional,Talk full environment short your. Site campaig...
3,5246,Jacqueline Raymond,professional,Nice much each partner leg property. Language ...
4,8198,Anthony Washington,professional,Serious why customer people. Four find purpose...
...,...,...,...,...
347,982150,Dennis Taylor,professional,Very assume rich live. Or cost couple nature.\...
348,984217,Debra Jones,professional,Start bag wind pull strong. But between along ...
349,986071,Shelley Clark,professional,Note the fast. Enough owner part student beaut...
350,3,Theresa Taylor 2,step-onto,Choice should lead budget task.


In [21]:
# or edit many records and update together, no changes done here, just demo
app.update_products_by_rowid(foo)

Total changes: 352


### aggregated table

In [22]:
# aggregate table is dynamically generated, here we ask fo just 5
app.get_aggregate_table(5)

Unnamed: 0,name,no. of products
0,Aaron Abbott,2
1,Aaron Acevedo,2
2,Aaron Acosta,8
3,Aaron Adams,12
4,Aaron Aguilar,2


In [23]:
# or don't pass anything, and we get all 222026 unique names and count
app.get_aggregate_table()

Unnamed: 0,name,no. of products
0,Aaron Abbott,2
1,Aaron Acevedo,2
2,Aaron Acosta,8
3,Aaron Adams,12
4,Aaron Aguilar,2
...,...,...
222021,Zoe Williams,2
222022,Zoe Wilson,2
222023,Zoe Wright,2
222024,Zoe Young,2
