### Import Stuff

In [1]:
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
import pyspark.sql.types as T

### Create SQL Context

In [2]:
sqlContext = SQLContext(sc)

### Create Dataframes on Cassandra tables

In [3]:
stores_df = sqlContext.read\
    .format("org.apache.spark.sql.cassandra")\
    .options(keyspace="retail", table="stores")\
    .load()

In [4]:
receipts_by_store_date_df = sqlContext.read\
    .format("org.apache.spark.sql.cassandra")\
    .options(keyspace="retail", table="receipts_by_store_date")\
    .load()

### Create UDFs - These may not perform well, but are run only on the final results

In [5]:
concat = udf(lambda s1, s2: s1 + s2, T.StringType())

### Compute the sales by state
1. join receipts_by_store_date to store
2. group by state
3. sum by receipt_total
4. do a select to add the dummy column, rename columns, compute the region and round the totals

In [6]:
sales_by_state_df = receipts_by_store_date_df\
   .join(stores_df, stores_df.store_id == receipts_by_store_date_df.store_id )\
   .groupBy("state")\
   .sum("receipt_total")\
   .select(F.lit("dummy").alias("dummy"), "state", concat(F.lit("US-"),"state").alias("region"), F.col("SUM(receipt_total)").cast("Decimal(10,2)").alias("receipts_total"))

In [7]:
sales_per_store_by_zip_code = receipts_by_store_date_df\
   .groupBy("state")\
   .sum("receipt_total")\
   .select(F.lit("dummy").alias("dummy"), "state", concat(F.lit("US-"),"state").alias("region"), F.col("SUM(receipt_total)").cast("Decimal(10,2)").alias("receipts_total"))

### Save it

In [8]:
sales_by_state_df.write\
      .format("org.apache.spark.sql.cassandra")\
      .options(keyspace = "retail", table = "sales_by_state")\
      .save()

Py4JJavaError: An error occurred while calling o155.save.
: java.lang.UnsupportedOperationException: 'Writing to a non-empty Cassandra Table is not allowed.'
	at org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:93)
	at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:309)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:144)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:207)
	at java.lang.Thread.run(Thread.java:745)


In [12]:
receipts_by_store_date_df.take(1)[0].customer.zip


u'98629'

In [14]:
receipts_by_store_date_df.show()
receipts_by_store_date_df.printSchema()



+--------+--------------------+-----------+--------------------+-------------+--------------------+------------------+
|store_id|        receipt_date|register_id|   receipt_timestamp|   receipt_id|            customer|     receipt_total|
+--------+--------------------+-----------+--------------------+-------------+--------------------+------------------+
|     293|2015-02-01 00:00:...|          1|2015-02-01 15:05:...|1444763487387|[Colton,Aguirre,1...|            929.84|
|     293|2015-02-01 00:00:...|          1|2015-02-02 00:34:...|1444763324380|[Russ,Roberson,14...|249.91000000000003|
|     293|2015-02-01 00:00:...|          3|2015-02-01 19:30:...|1444763602752|[Katherine,Pucket...|           1749.74|
|     293|2015-02-01 00:00:...|          7|2015-02-01 18:44:...|1444763496632|[Pat,Burt,815 Laf...|1039.8999999999999|
|     293|2015-02-01 00:00:...|          9|2015-02-02 02:24:...|1444763294711|[Jane,Shaw,416 Pi...|           1539.82|
|     293|2015-02-01 00:00:...|         10|2015-

In [35]:
receipts_by_store_date_df.rdd.map( lambda (store_id, receipt_date, register_id, receipt_timestamp, receipt_id, customer, receipt_total) : ((store_id, customer[5]),(receipt_total, 1))).reduceByKey(lambda x, y: x+y).sortBy(lambda (x,y): -y[0]).take(100)




[((222, u'53128'), (Decimal('3809.63'), 1)),
 ((311, u'10154'), (Decimal('3499.69'), 1)),
 ((384, u'84464'), (Decimal('3449.67'), 1)),
 ((301, u'84578'), (Decimal('3349.7'), 1)),
 ((296, u'50706'), (Decimal('3339.6499999999996'), 1)),
 ((206, u'08457'), (Decimal('3299.6800000000003'), 1)),
 ((141, u'10992'), (Decimal('3269.6499999999996'), 1)),
 ((56, u'40728'), (Decimal('3229.6699999999996'), 1)),
 ((383, u'37018'), (Decimal('3219.7200000000003'), 1)),
 ((362, u'40282'), (Decimal('3179.72'), 1)),
 ((56, u'56021'), (Decimal('3179.7'), 1)),
 ((146, u'93926'), (Decimal('3179.6899999999996'), 1)),
 ((35, u'12600'), (Decimal('3169.71'), 1)),
 ((152, u'75910'), (Decimal('3169.68'), 1)),
 ((236, u'11621'), (Decimal('3169.6499999999996'), 1)),
 ((396, u'78070'), (Decimal('3159.7'), 1)),
 ((395, u'45326'), (Decimal('3129.7200000000003'), 1)),
 ((89, u'66903'), (Decimal('3129.7200000000003'), 1)),
 ((156, u'65295'), (Decimal('3119.69'), 1)),
 ((86, u'14081'), (Decimal('3109.65'), 1)),
 ((86, u'