## Modeling the dataset with Cassandra

### Imports

In [None]:
# os
import os

# cassandra driver
from cassandra.cluster import Cluster
from cassandra.cluster import SimpleStatement, ConsistencyLevel

# pandas
import pandas as pd

In [2]:
datadir = './data'

### Connecting to Cassandra

In [8]:
CASSANDRA_NODES = [ os.getenv('CASSANDRA_NODE', '127.0.0.1') ]

cluster = Cluster(CASSANDRA_NODES)
session = cluster.connect()

In [355]:
#hint dir() lists all methods of a python object
#cluster.metadata.keyspaces['lbsn'].tables['venues'].columns['vid'].cql_type
cluster.metadata.keyspaces.keys()

dict_keys(['system_auth', 'system', 'system_distributed', 'system_traces', 'system_schema', 'lbsn'])

In [56]:
if 'lbsn' in cluster.metadata.keyspaces.keys():
    session.execute("DROP KEYSPACE lbsn")

# or more simply
session.execute("DROP KEYSPACE IF EXISTS lbsn");

In [57]:
cql_stmt = """
    CREATE KEYSPACE IF NOT EXISTS lbsn 
    WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1' } 
"""
result = session.execute(cql_stmt)

In [58]:
cql_stmt = """
    CREATE TABLE lbsn.events (
      uid     bigint,
      ts      timestamp,
      lat     double,
      lon     double,
      vid     bigint,
      PRIMARY KEY (uid, ts)
    ) WITH CLUSTERING ORDER BY (ts DESC);
"""
result = session.execute(cql_stmt)

In [165]:
session.execute("DROP TABLE IF EXISTS lbsn.users");

cql_stmt = """
    CREATE TABLE lbsn.users (
      uid     bigint,
      fid      bigint,
      PRIMARY KEY (uid, fid)
    );
"""
result = session.execute(cql_stmt)

In [166]:
session.execute("DROP TABLE IF EXISTS lbsn.venues");

cql_stmt = """
    CREATE TABLE lbsn.venues (
      vid   bigint, 
      name  text,
      lat   double,
      lon   double,
      PRIMARY KEY (vid)
    );
"""
result = session.execute(cql_stmt)

In [268]:
filename = os.path.join(datadir, 'gowalla_events.nyc.csv')

#clear the table before inserting data from file
result = session.execute("TRUNCATE lbsn.events;")

with open(filename) as f:
    header_line = next(f)
    for line in f:
        # strip and split using tabs
        e = line.strip().split('\t')
        
        cql_stmt = """ 
            INSERT INTO lbsn.events (uid, ts, lat, lon, vid) 
                values ({}, '{}', {}, {}, {})""".format(*e)
        
        query = SimpleStatement (cql_stmt,consistency_level=ConsistencyLevel.ONE)
        session.execute_async(query)

In [269]:
cql_stmt = """
    SELECT * from lbsn.events where uid=0 limit 5;
"""
rows = session.execute(cql_stmt)

print("Column names:")
print(rows.column_names)

print("\nRows:")
for row in rows:
    print(list(row))

Column names:
['uid', 'ts', 'lat', 'lon', 'vid']

Rows:
[0, datetime.datetime(2010, 10, 12, 0, 21, 28), 40.6438845363, -73.78280639649999, 23261]
[0, datetime.datetime(2010, 10, 11, 20, 21, 20), 40.74137425, -73.9881052167, 16907]
[0, datetime.datetime(2010, 10, 11, 20, 20, 42), 40.741388197, -73.98945450779999, 12973]
[0, datetime.datetime(2010, 10, 11, 0, 6, 30), 40.724910334499995, -73.9946207517, 341255]
[0, datetime.datetime(2010, 10, 10, 22, 0, 37), 40.729768314, -73.9985353275, 260957]


In [270]:
cql_stmt = """
    SELECT * from lbsn.events where uid=0 limit 5;
"""
rows = session.execute(cql_stmt)
df = pd.DataFrame(list(rows))
df

Unnamed: 0,uid,ts,lat,lon,vid
0,0,2010-10-12 00:21:28,40.643885,-73.782806,23261
1,0,2010-10-11 20:21:20,40.741374,-73.988105,16907
2,0,2010-10-11 20:20:42,40.741388,-73.989455,12973
3,0,2010-10-11 00:06:30,40.72491,-73.994621,341255
4,0,2010-10-10 22:00:37,40.729768,-73.998535,260957


In [291]:
cql_stmt = """
    SELECT * from lbsn.events ;
"""
rows = session.execute(cql_stmt)
df = pd.DataFrame(list(rows))
df.count()

uid    112383
ts     112383
lat    112383
lon    112383
vid    112383
dtype: int64

In [278]:
#single threaded operation look here for multithreading 
#http://stackoverflow.com/questions/33153518/python-cassandra-driver-same-insert-performance-as-copy

def copy(session, keyspace, table, filename, sep='\t', header=True, cols={}):
    # cols is a dict in the form {'cvs_colname' : 'table_colname'}
    # optional for cvs to table column translation
    
    #metadata for the cassandra columns in the given table
    columns = session.cluster.metadata.keyspaces[keyspace].tables[table].columns
    
    #get all columns defs
    if not cols:
        c = columns.keys()
        cols = dict(zip(c,c))
    
    # traslate column names from file to table
    def translate_names(names):
        return [ cols.get(x, x) for x in names]
    
    #value needs quotes
    def needs_quotes(names) :
        quoted_types = ['text', 'timestamp', 'varchar', 'inet', 'ascii']
        return [ columns[i].cql_type in quoted_types for i in names ]
        
    #from csv to valid cql quoted list of values
    # https://docs.datastax.com/en/cql/3.3/cql/cql_reference/valid_literal_r.html
    # https://docs.datastax.com/en/cql/3.3/cql/cql_reference/escape_char_r.html
    def csv(s, q):
        l = s.strip().split(sep)
        l = [ '\'%s\'' % x[1].replace("'", "''") if x[0] else x[1] for x in list(zip(q, l)) ]
        return ','.join(l)
        
    with open(filename) as f:
        #header
        if header:
            h = next(f).strip().split(sep)
            h = translate_names(h)
        else:
            #if not found assume all columns are in the csv file
            # with the same order as in the cassandra table definition
            h = columns.keys()
            
        #csv colunm names
        c = ','.join(h)
        q = needs_quotes(h)
        
        n = 0; i = 0
        for line in f:
            cql_stmt = "INSERT INTO {}.{} ({}) values ({})".format(keyspace, table, c, csv(line,q))
            query = SimpleStatement (cql_stmt,consistency_level=ConsistencyLevel.ONE)
            try:
                session.execute(query)
                i +=1
            except:
                print(query)
                pass
            n +=1
            
    print("written {} of {} records".format(i,n))

In [280]:
filename = os.path.join(datadir, 'gowalla_users.nyc.csv')

session.execute("TRUNCATE lbsn.users;")
copy(session,'lbsn','users', filename)

written 30042 of 30042 records


In [281]:
cql_stmt = """
    SELECT * from lbsn.users limit 5;
"""
rows = session.execute(cql_stmt)
df = pd.DataFrame(list(rows))
df

Unnamed: 0,uid,fid
0,29720,459
1,29720,1557
2,29720,29094
3,3236,29221
4,3236,66893


In [279]:
filename = os.path.join(datadir, 'gowalla_venues.nyc.csv')

session.execute("TRUNCATE lbsn.venues;")
copy(session, 'lbsn','venues', filename)

written 17291 of 17291 records


In [267]:
cql_stmt = """
    SELECT * from lbsn.venues limit 5;
"""
rows = session.execute(cql_stmt)
df = pd.DataFrame(list(rows))
df

Unnamed: 0,vid,lat,lon,name
0,12775,40.756659,-73.925328,Sunswick
1,1198814,40.735735,-74.065016,Dosa House
2,635262,40.752903,-73.972764,Kushi Q
3,936046,40.752021,-74.04537,C-Town
4,587968,40.718743,-73.999507,Lunch Box Buffet


### Some queries

In [297]:
# how many events for user 0?

cql_stmt = """
    SELECT count(1) from lbsn.events where uid=0;
"""
rows = session.execute(cql_stmt)
rows[0].count

25

In [310]:
# how many events for user 0?

cql_stmt = """
    SELECT * from lbsn.events where uid=0 limit ;
"""
rows = session.execute(cql_stmt)
pd.DataFrame(list(rows))

Unnamed: 0,uid,ts,lat,lon,vid
0,0,2010-10-12 00:21:28,40.643885,-73.782806,23261
1,0,2010-10-11 20:21:20,40.741374,-73.988105,16907
2,0,2010-10-11 20:20:42,40.741388,-73.989455,12973
3,0,2010-10-11 00:06:30,40.72491,-73.994621,341255
4,0,2010-10-10 22:00:37,40.729768,-73.998535,260957
5,0,2010-10-10 21:17:14,40.728527,-73.996868,1933724
6,0,2010-10-10 17:47:04,40.741747,-73.993421,105068
7,0,2010-10-09 23:51:10,40.734193,-74.004164,34817
8,0,2010-10-09 22:27:07,40.742512,-74.006031,27836
9,0,2010-10-09 21:39:26,40.742396,-74.007543,15079


In [311]:
# how many events for user 0 during Oct 8, 2010, UTC time?

cql_stmt = """
    SELECT count(1) from lbsn.events where uid=0 and ts>='2010-10-08 00:00:00' and ts<'2010-10-09 00:00:00';
"""
rows = session.execute(cql_stmt)
rows[0].count

6

In [312]:
#how many users is user 0 following?

cql_stmt = """
    SELECT count(1) from lbsn.users where uid=0;
"""
rows = session.execute(cql_stmt)
rows[0].count

149

In [316]:
# is user 0 following user 44?
cql_stmt = """
    SELECT count(1) from lbsn.users where uid=0 and fid=44;
"""
rows = session.execute(cql_stmt)
rows[0].count>0

True

In [317]:
# is user 44 following user 0?
cql_stmt = """
    SELECT count(1) from lbsn.users where uid=0 and fid=44;
"""
rows = session.execute(cql_stmt)
rows[0].count>0

True

In [319]:
# get the details of venue 23261
cql_stmt = """
    SELECT * from lbsn.venues where vid=23261;
"""
rows = session.execute(cql_stmt)
list(rows[0])

[23261, 40.6438845363, -73.78280639649999, 'JFK John F. Kennedy International']

In [352]:
# simple join for small collections
# which venues visited by user 0?

cql_stmt = """
    SELECT vid from lbsn.events where uid=0;
"""
rows = session.execute(cql_stmt)
venues = set([x.vid for x in list(rows)])

cql_stmt = """
    SELECT name from lbsn.venues where vid in ({});
""".format(str(venues)[1:-1])
rows = session.execute(cql_stmt)
[x.name for x in list(rows)]

['New York Stock Exchange',
 'Times Square',
 'Empire State Building',
 'Flatiron Building ',
 'The High Line',
 'Trinity Church',
 'Washington Square Park ',
 'Shake Shack',
 'Madison Square Park',
 'Federal Hall National Memorial',
 'Magnolia Bakery, Downtown',
 'JFK John F. Kennedy International',
 'The Chelsea Market',
 'Chrysler Building',
 'Westville',
 'Ace Hotel',
 "John's of Bleecker Street",
 'Market',
 'The Half Pint',
 "Emilio's Ballato Restaurant",
 'Van Leeuwen Artisan Ice Cream',
 'Sicaffe',
 'New York University (NYU)']

### More analytics and data exploration

- secondary indexes
- views
- set, lists, and user defined types (UDTs)
- build help tables


- advanced analytics in spark, via the spark cassandra connector (apache license)
- full text indexing and search with elasticsearch: elassandra (apache license)
- full text indexing with cassandra enterprise (commercial license)
