In [1]:
from plotly.offline import download_plotlyjs, init_notebook_mode, plot, iplot
import plotly.graph_objs as go
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import os

init_notebook_mode(connected=True)
spark = SparkSession \
          .builder \
          .appName("Project23") \
          .config("hive.metastore.uris", "thrift://localhost:9083") \
          .enableHiveSupport() \
          .getOrCreate()

In [None]:
# define schema to load data
schema = StructType([
        StructField("year", IntegerType(), True),
        StructField("cname113", StringType(), True),
        StructField("cname", StringType(), True),
        StructField("state", StringType(), True),
        StructField("nod", IntegerType(), True),
        StructField("avg_age", FloatType(), True)
    ])

# load csv file from local to spark
csv_df = spark.read.csv("/home/cloudera/data.csv", header=True, mode="DROPMALFORMED", schema=schema)

# save data to hive
csv_df.createOrReplaceTempView("data")
spark.sql("CREATE TABLE info \
           (year STRING, cname113 STRING,cname STRING, state STRING, nod INT, avg_age FLOAT)\
           STORED AS ORC" )    
spark.sql("INSERT INTO info SELECT * FROM data")
spark.sql("SHOW TABLES").show()

In [2]:
# load data from hive into dataframe
df = spark.table("info")

In [None]:
# Total death by years
total_death_by_years = df.select("year", "nod") \
                        .groupBy("year") \
                        .sum("nod") \
                        .orderBy("year") \
                        .rdd \
                        .map(lambda x: (x[0], x[1])) \
                        .collect()

# total_death_by_years = spark.sql("SELECT year, SUM(nod) FROM info GROUP BY year ORDER BY year") \
#                             .rdd \
#                             .map(lambda x: (x[0], x[1])) \
#                             .collect()

data = [go.Bar(
            x = list(map(lambda x: x[0], total_death_by_years)),
            y = list(map(lambda x: x[1], total_death_by_years)),
            marker=dict(color='rgb(158,202,225)',
                        line=dict(color='rgb(8,48,107)',width=0.5,)),
    )]
fig = dict(data=data, layout=dict(title="Total Deaths By Year"))
iplot(fig)

In [None]:
# A rate causes of death from 1999 to 2015
all_causes_rate = df.select("cname", "nod") \
                        .groupBy("cname") \
                        .sum("nod") \
                        .rdd \
                        .map(lambda x: (x[0], x[1])) \
                        .collect()

# all_causes_rate = spark.sql("SELECT cname, SUM(nod) FROM info GROUP BY cname") \
#                         .rdd \
#                         .map(lambda x: (x[0], x[1])) \
#                         .collect()

data = [go.Pie(
            labels = list(map(lambda x: x[0], all_causes_rate)),
            values = list(map(lambda x: x[1], all_causes_rate)),
    )]
fig = dict(data=data, layout=dict(title="Death Causes Rate"))
iplot(fig)

In [None]:
# Total death by states

# define schema to load data
us_schema = StructType([
        StructField("code", StringType(), True),
        StructField("state", StringType(), True),
    ])

# load csv file from local
us_df = spark.read.csv("/home/cloudera/us.csv", header=True, mode="DROPMALFORMED", schema=us_schema)

total_death_by_states = df.select("state", "nod", "year", "cname") \
                        .groupBy("state") \
                        .agg(sum("nod").alias("sod")) \
                        .join(us_df, us_df.state==df.state) \
                        .select("code", "sod") \
                        .rdd \
                        .map(lambda x: (x[0], x[1])) \
                        .collect()

# us_df.createOrReplaceTempView("us")
# spark.sql("SELECT u.code, sum(i.nod) AS sod FROM info AS i JOIN us AS u ON i.state = u.state GROUP BY u.code")
#         .rdd \
#         .map(lambda x: (x[0], x[1])) \
#         .collect()
                        
# We can add condition to show different:
# .where(df.year == "1999") \
# or .where("df.cname == "Suicide") \

data = [dict(
        type='choropleth',
        autocolorscale = True,
        locations = list(map(lambda x: x[0], total_death_by_states)),
        z = list(map(lambda x: x[1], total_death_by_states)),
        locationmode = 'USA-states',
        marker = dict(
            line = dict (
                color = 'rgb(255,255,255)',
                width = 2
            ) ),
        colorbar = dict(
            title = "No of Death")
        )]
layout = dict(
        title = 'Number of Death by State (1999-2015)',
        geo = dict(
            scope='usa',
            projection=dict(type='albers usa'),
            showlakes = True,
            lakecolor = 'rgb(255, 255, 255)'),
            )
    
fig = dict(data=data, layout=layout)
iplot(fig)

In [None]:
fnt = spark.sql("show functions").collect()
spark.sql("describe function instr").show(truncate = False)
spark.sql("show databases").show()
spark.sql("show tables").show()