This is an example of how to use the Cassandra Python driver to read data, parse and store into a Cassandra DB

In [None]:
from cassandra.cluster import Cluster

cluster = Cluster()
session = cluster.connect('openpayments')

In [24]:
# NOTE: this will drop the table and remove any data
session.execute("DROP TABLE payments")

In [25]:
session.execute("""
  create table payments (
    General_Transaction_ID TEXT PRIMARY KEY, 
    Teaching_Hospital_ID TEXT, 
    Teaching_Hospital_Name TEXT, 
    Physician_Profile_ID TEXT, 
    Physician_First_Name TEXT, 
    Physician_Middle_Name TEXT, 
    Physician_Last_Name TEXT, 
    Recipient_Zip_Code TEXT, 
    Product_Indicator TEXT, 
    Name_of_Associated_Covered_Drug_or_Biological1 TEXT, 
    Name_of_Associated_Covered_Device_or_Medical_Supply1 TEXT, 
    Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_Name TEXT, 
    Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_ID TEXT, 
    Total_Amount_of_Payment_USDollars TEXT, 
    Date_of_Payment TEXT);
 """)

We will now need to process the data, one row at a time, parse the record, extract the fields and insert into the `payments` table (note: at this stage, we would probably do some transformations, eg, convert the USD to floats and the dates to timestamps).

The mapping from ordinal in the CSV to column name is as follows:

    0: General_Transaction_ID, 
    5: Teaching_Hospital_ID, 
    6: Teaching_Hospital_Name, 
    7: Physician_Profile_ID, 
    8: Physician_First_Name, 
    9: Physician_Middle_Name, 
    10: Physician_Last_Name, 
    16: Recipient_Zip_Code, 
    27: Product_Indicator, 
    28: Name_of_Associated_Covered_Drug_or_Biological1, 
    38: Name_of_Associated_Covered_Device_or_Medical_Supply1, 
    43: Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_Name, 
    44: Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_ID, 
    48: Total_Amount_of_Payment_USDollars, 
    49: Date_of_Payment, 

We will use the `csv` library to correctly parse the data.

In [26]:
fields_mapping = {
    0: "General_Transaction_ID", 
    5: "Teaching_Hospital_ID", 
    6: "Teaching_Hospital_Name", 
    7: "Physician_Profile_ID", 
    8: "Physician_First_Name", 
    9: "Physician_Middle_Name", 
    10: "Physician_Last_Name", 
    16: "Recipient_Zip_Code", 
    27: "Product_Indicator", 
    28: "Name_of_Associated_Covered_Drug_or_Biological1", 
    38: "Name_of_Associated_Covered_Device_or_Medical_Supply1", 
    43: "Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_Name", 
    44: "Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_ID", 
    48: "Total_Amount_of_Payment_USDollars", 
    49: "Date_of_Payment"
}

In [34]:
def build_query(record):
    query = 'INSERT INTO payments ('
    values = ''
    for col_name, col_value in record.iteritems():
        query += col_name + ', '
        values += "'" + str(col_value).replace("'", "") + "', "
    query = query[:-2]
    values = values[:-2]
    return query + ") VALUES (" + values + ");"


In [35]:
import csv
from __future__ import print_function

data_file = '/Users/marco/Development/MSAN694/cassandra/cassandra-demo.csv'
count = 0
with open(data_file) as payments:
    for record in csv.reader(payments):
        if len(record) < 49:
            continue
        record_data = dict()
        for field_id, field_name in fields_mapping.iteritems():
            
            record_data[field_name] = record[field_id]
        query = build_query(record_data)
        try:
            session.execute(query)
            count += 1
        except Exception as ex:
            print("Error occurred while executing {query} near or at row [{line}]: {error}".format(
                    query=query, error=ex, line=count+1))
            # ignore and continue
print("Saved {} records".format(count));
        
        

Saved 4812 records


From the saved records we can extract useful info: for example, how about running a reduce() on the payments?

In [36]:
query = "SELECT teaching_hospital_id, teaching_hospital_name, total_amount_of_payment_usdollars FROM payments"
resultset = session.execute(query)
redux = {}
for result in resultset:
    if result.teaching_hospital_id:
        acc = 0.0
        if result.teaching_hospital_name in redux:
            acc = redux[result.teaching_hospital_name]
        acc += float(result.total_amount_of_payment_usdollars)
        redux[result.teaching_hospital_name] = acc


In [37]:
print("{:60}{:>20}".format('Hospital', 'USD ($)'))
print("{:60}{:>20}".format('--------', '-------'))
for name, value in redux.items():
    print("{:60}{:20.2f}".format(name, value))

Hospital                                                                 USD ($)
--------                                                                 -------
Scripps Health                                                          20000.00
University Of Arkansas For Medical Sciences                             92514.00
University Of California Irvine                                          1374.25
St. LukeS Episcopal Hospital                                               42.48
Adventist Health System-Sunbelt Inc                                       289.49
Christiana Care Health Services Inc                                        71.30
Ohio Valley Medical Center                                               2856.00
Lester E Cox Medical Centers                                                0.02
County Of Los Angeles Auditor Controller                                  323.86
University Medical Center Of Southern Nevada                             1031.32
ChildrenS Hospital Of Pittsb