# Spark SQL

We'll be working with JSON files containing data from the U.S. Census with the following columns:
- `age - Age (year)
- `females - Number of females
- `males - Number of males
- `total - Total number of individuals
- `year - Year column (2010 for all rows)

### Registering a DataFrame as a table

In [5]:
import findspark

In [9]:
findspark.init()

In [11]:
import pyspark

In [12]:
sc = pyspark.SparkContext()

In [13]:
from pyspark.sql import SQLContext

In [14]:
sqlCtx = SQLContext(sc)

In [15]:
df = sqlCtx.read.json("census_2010.json")

In [16]:
df.registerTempTable('census2010')

In [17]:
tables = sqlCtx.tableNames()

In [19]:
print(tables)

['census2010']


### Querying

In [20]:
sqlCtx.sql("SELECT age FROM census2010 LIMIT 20").show()

+---+
|age|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+---+



### Filtering

In [21]:
query = 'select males,females from census2010 where age>5 and age<15'

In [22]:
sqlCtx.sql(query).show()

+-------+-------+
|  males|females|
+-------+-------+
|2093905|2007781|
|2097080|2010281|
|2101670|2013771|
|2108014|2018603|
|2114217|2023289|
|2118390|2026352|
|2132030|2037286|
|2159943|2060100|
|2195773|2089651|
+-------+-------+



### Mixing functionality

In [23]:
sqlCtx.sql('SELECT males, females FROM census2010').describe().show()

+-------+------------------+-----------------+
|summary|             males|          females|
+-------+------------------+-----------------+
|  count|               101|              101|
|   mean|1520095.3168316833|1571460.287128713|
| stddev|  818587.208016823|748671.0493484351|
|    min|              4612|            25673|
|    max|           2285990|          2331572|
+-------+------------------+-----------------+



### Registering multiple DataFrames as tables

In [24]:
census2010 = sqlCtx.read.json("census_2010.json")
census2010.registerTempTable('census2010')

In [25]:
census1980 = sqlCtx.read.json("census_1980.json")
census1980.registerTempTable('census1980')

In [26]:
census1990 = sqlCtx.read.json("census_1990.json")
census1990.registerTempTable('census1990')

In [27]:
census2000 = sqlCtx.read.json("census_2000.json")
census2000.registerTempTable('census2000')

In [28]:
sqlCtx.tableNames()

['census1980', 'census1990', 'census2000', 'census2010']

### Wrting join queries to compare values across multiple tables

In [31]:
query = """
     select census2010.total, census2000.total
     from census2010
     inner join census2000
     on census2010.age=census2000.age
    """

In [32]:
sqlCtx.sql(query).show()

+-------+-------+
|  total|  total|
+-------+-------+
|4079669|3733034|
|4085341|3825896|
|4089295|3904845|
|4092221|3970865|
|4094802|4024943|
|4097728|4068061|
|4101686|4101204|
|4107361|4125360|
|4115441|4141510|
|4126617|4150640|
|4137506|4152174|
|4144742|4145530|
|4169316|4139512|
|4220043|4138230|
|4285424|4137982|
|4347028|4133932|
|4410804|4130632|
|4451147|4111244|
|4454165|4068058|
|4432260|4011192|
+-------+-------+
only showing top 20 rows



### Using SQL functions

In [33]:
query = """
     select sum(census2010.total), sum(census2000.total), sum(census1990.total)
     from census2010
     inner join census2000
     on census2010.age=census2000.age
     inner join census1990
     on census2010.age=census1990.age
       """

In [34]:
sqlCtx.sql(query).show()

+----------+----------+----------+
|sum(total)|sum(total)|sum(total)|
+----------+----------+----------+
| 312247116| 284594395| 254506647|
+----------+----------+----------+

