In [112]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext
from pyspark.sql.types import *

In [114]:
sc

<pyspark.context.SparkContext at 0x7f54c9555950>

In [115]:
hiveContext = HiveContext(sc)

In [5]:
hiveContext

<pyspark.sql.context.HiveContext at 0x7f54c8566b50>

In [5]:
! pwd

/home/cloudera/Desktop/sparkETL


In [118]:
! pwd

/home/cloudera/Desktop/sparkETL


In [120]:
originalDf = hiveContext.read \
        .format("com.databricks.spark.csv") \
        .option("header", "true") \
        .option("inferschema", "true") \
        .option("delimiter", ",") \
        .load("file:///home/cloudera/Desktop/sparkETL/loan_small.csv")

In [121]:
type(originalDf)

pyspark.sql.dataframe.DataFrame

In [122]:
originalDf.count()

4999

In [134]:
# originalDf.show(5)

In [136]:
# originalDf.printSchema()

In [137]:
# originalDf.printSchema()
selectDf = originalDf.select(
    originalDf.id.cast('string'),
    originalDf.member_id.cast('string'),
    originalDf.loan_amnt.cast('float'),
    originalDf.funded_amnt.cast('float'),
    originalDf.term.cast('string'),
    originalDf.int_rate.cast('float'),
    originalDf.home_ownership.cast('string'),
    originalDf.annual_inc.cast('float'),
    originalDf.issue_d.cast('string'),
    originalDf.zip_code.cast('string'),
    originalDf.addr_state.cast('string')    
)

In [138]:
selectDf.printSchema()

root
 |-- id: string (nullable = true)
 |-- member_id: string (nullable = true)
 |-- loan_amnt: float (nullable = true)
 |-- funded_amnt: float (nullable = true)
 |-- term: string (nullable = true)
 |-- int_rate: float (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: float (nullable = true)
 |-- issue_d: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- addr_state: string (nullable = true)



In [139]:
selectDf.count()

4999

In [140]:
selectDf.head(5)

[Row(id=u'1077501', member_id=u'1296599', loan_amnt=5000.0, funded_amnt=5000.0, term=u' 36 months', int_rate=10.649999618530273, home_ownership=u'RENT', annual_inc=24000.0, issue_d=u'Dec-2011', zip_code=u'860xx', addr_state=u'AZ'),
 Row(id=u'1077430', member_id=u'1314167', loan_amnt=2500.0, funded_amnt=2500.0, term=u' 60 months', int_rate=15.270000457763672, home_ownership=u'RENT', annual_inc=30000.0, issue_d=u'Dec-2011', zip_code=u'309xx', addr_state=u'GA'),
 Row(id=u'1077175', member_id=u'1313524', loan_amnt=2400.0, funded_amnt=2400.0, term=u' 36 months', int_rate=15.960000038146973, home_ownership=u'RENT', annual_inc=12252.0, issue_d=u'Dec-2011', zip_code=u'606xx', addr_state=u'IL'),
 Row(id=u'1076863', member_id=u'1277178', loan_amnt=10000.0, funded_amnt=10000.0, term=u' 36 months', int_rate=13.489999771118164, home_ownership=u'RENT', annual_inc=49200.0, issue_d=u'Dec-2011', zip_code=u'917xx', addr_state=u'CA'),
 Row(id=u'1075358', member_id=u'1311748', loan_amnt=3000.0, funded_amn

In [141]:
originalDf.select('term').head(5)

[Row(term=u' 36 months'),
 Row(term=u' 60 months'),
 Row(term=u' 36 months'),
 Row(term=u' 36 months'),
 Row(term=u' 60 months')]

In [12]:
hdfsBasePath = "/user/loan/"

In [142]:
hiveColumns = """
id STRING,
member_id STRING,
loan_amnt FLOAT,
funded_amnt FLOAT,
term STRING,
int_rate FLOAT,
home_ownership STRING,
annual_inc FLOAT,
zip_code STRING,
addr_state STRING
"""

hiveDb = 'test_db'
hiveTb = 'loan'


In [143]:
hiveColumns

'\nid STRING,\nmember_id STRING,\nloan_amnt FLOAT,\nfunded_amnt FLOAT,\nterm STRING,\nint_rate FLOAT,\nhome_ownership STRING,\nannual_inc FLOAT,\nzip_code STRING,\naddr_state STRING\n'

In [15]:
createHiveDb = """
    CREATE DATABASE IF NOT EXISTS {0}
""".format(hiveDb)

In [147]:
# createHiveDb = """
#     CREATE DATABASE IF NOT EXISTS %s
# """ % hiveDb

In [148]:
createHiveTb = """
    CREATE EXTERNAL TABLE IF NOT EXISTS {0}.{1}
    (%s)
    PARTITIONED BY (issue_d STRING)
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ','
    STORED AS PARQUET
    LOCATION '{2}'
""".format(hiveDb, hiveTb, hdfsBasePath) % hiveColumns

In [149]:
print createHiveTb


    CREATE EXTERNAL TABLE IF NOT EXISTS test_db.loan
    (
id STRING,
member_id STRING,
loan_amnt FLOAT,
funded_amnt FLOAT,
term STRING,
int_rate FLOAT,
home_ownership STRING,
annual_inc FLOAT,
zip_code STRING,
addr_state STRING
)
    PARTITIONED BY (issue_d STRING)
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ','
    STORED AS PARQUET
    LOCATION '/user/loan/'



In [91]:
createHivePartition = """
    ALTER TABLE {0}.{1} ADD IF NOT EXISTS
""".format(hiveDb, hiveTb)

partitions = selectDf.select('issue_d').distinct().collect()

for partition in partitions:
    for key in partition.asDict():
        pName = key
        pValue = partition.asDict()[key]
        createHivePartition += ("PARTITION (" + pName + " = " + "'%s'" + ") LOCATION " + \
                                "'" + hdfsBasePath + pName + "=" + pValue + "'\n") % pValue


In [99]:
print createHivePartition


    ALTER TABLE test_db.loan ADD IF NOT EXISTS
PARTITION (issue_d = 'Dec-2011') LOCATION '/user/loan/issue_d=Dec-2011'
PARTITION (issue_d = 'Nov-2011') LOCATION '/user/loan/issue_d=Nov-2011'
PARTITION (issue_d = 'Oct-2011') LOCATION '/user/loan/issue_d=Oct-2011'



In [101]:
selectDf.count()

4999

In [151]:
hiveContext.sql(createHiveDb)

DataFrame[result: string]

In [152]:
hiveContext.sql(createHiveTb)

DataFrame[result: string]

In [153]:
hiveContext.sql(createHivePartition)

DataFrame[result: string]

In [154]:
selectDf.write.partitionBy("issue_d").mode("overwrite").parquet(hdfsBasePath)

In [25]:
selectDf.show(7)

+-------+---------+---------+-----------+----------+--------+--------------+----------+--------+--------+----------+
|     id|member_id|loan_amnt|funded_amnt|      term|int_rate|home_ownership|annual_inc| issue_d|zip_code|addr_state|
+-------+---------+---------+-----------+----------+--------+--------------+----------+--------+--------+----------+
|1077501|  1296599|   5000.0|     5000.0| 36 months|   10.65|          RENT|   24000.0|Dec-2011|   860xx|        AZ|
|1077430|  1314167|   2500.0|     2500.0| 60 months|   15.27|          RENT|   30000.0|Dec-2011|   309xx|        GA|
|1077175|  1313524|   2400.0|     2400.0| 36 months|   15.96|          RENT|   12252.0|Dec-2011|   606xx|        IL|
|1076863|  1277178|  10000.0|    10000.0| 36 months|   13.49|          RENT|   49200.0|Dec-2011|   917xx|        CA|
|1075358|  1311748|   3000.0|     3000.0| 60 months|   12.69|          RENT|   80000.0|Dec-2011|   972xx|        OR|
|1075269|  1311441|   5000.0|     5000.0| 36 months|     7.9|   

In [155]:
jdbcUrl = "jdbc:mysql://localhost/test_db"
properties = {
    "user": "root",
    "password": "1"
}

In [156]:
selectDf.write.jdbc(url=jdbcUrl, table="loan", mode="overwrite", properties=properties)