### Connect to mysql and check the content of the metastore DB

In [1]:
import pymysql
import pandas as pd

In [2]:
mysql_user, mysql_password = 'hiveuser', 'hivepassword'
cred = '?user=' + mysql_user + '&password=' + mysql_password
db = pymysql.connect(host="localhost",
                     user='hiveuser',
                     passwd='hivepassword',
                     database='metastore')

In [3]:
pd.read_sql("show tables;", db).tail(7)

Unnamed: 0,Tables_in_metastore
46,TBL_COL_PRIVS
47,TBL_PRIVS
48,TXNS
49,TXN_COMPONENTS
50,TYPES
51,TYPE_FIELDS
52,VERSION


In [4]:
pd.read_sql("select * from TBLS;", db)

Unnamed: 0,TBL_ID,CREATE_TIME,DB_ID,LAST_ACCESS_TIME,OWNER,RETENTION,SD_ID,TBL_NAME,TBL_TYPE,VIEW_EXPANDED_TEXT,VIEW_ORIGINAL_TEXT,LINK_TARGET_ID
0,1,1450179061,1,0,nasdag,0,1,master,MANAGED_TABLE,,,
1,2,1450179063,1,0,nasdag,0,2,salaries,MANAGED_TABLE,,,


### Load the same with Spark

In [5]:
import pyspark
sc = pyspark.SparkContext()

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df2 = sqlContext.read.format('jdbc').options(url='jdbc:mysql://localhost/metastore'+cred, dbtable='TBLS').load()
df2

DataFrame[TBL_ID: bigint, CREATE_TIME: int, DB_ID: bigint, LAST_ACCESS_TIME: int, OWNER: string, RETENTION: int, SD_ID: bigint, TBL_NAME: string, TBL_TYPE: string, VIEW_EXPANDED_TEXT: string, VIEW_ORIGINAL_TEXT: string, LINK_TARGET_ID: bigint]

In [6]:
df2.printSchema()

root
 |-- TBL_ID: long (nullable = false)
 |-- CREATE_TIME: integer (nullable = false)
 |-- DB_ID: long (nullable = true)
 |-- LAST_ACCESS_TIME: integer (nullable = false)
 |-- OWNER: string (nullable = true)
 |-- RETENTION: integer (nullable = false)
 |-- SD_ID: long (nullable = true)
 |-- TBL_NAME: string (nullable = true)
 |-- TBL_TYPE: string (nullable = true)
 |-- VIEW_EXPANDED_TEXT: string (nullable = true)
 |-- VIEW_ORIGINAL_TEXT: string (nullable = true)
 |-- LINK_TARGET_ID: long (nullable = true)



In [7]:
df2.collect()

[Row(TBL_ID=1, CREATE_TIME=1450179061, DB_ID=1, LAST_ACCESS_TIME=0, OWNER=u'nasdag', RETENTION=0, SD_ID=1, TBL_NAME=u'master', TBL_TYPE=u'MANAGED_TABLE', VIEW_EXPANDED_TEXT=None, VIEW_ORIGINAL_TEXT=None, LINK_TARGET_ID=None),
 Row(TBL_ID=2, CREATE_TIME=1450179063, DB_ID=1, LAST_ACCESS_TIME=0, OWNER=u'nasdag', RETENTION=0, SD_ID=2, TBL_NAME=u'salaries', TBL_TYPE=u'MANAGED_TABLE', VIEW_EXPANDED_TEXT=None, VIEW_ORIGINAL_TEXT=None, LINK_TARGET_ID=None)]

In [8]:
df2.toPandas()

Unnamed: 0,TBL_ID,CREATE_TIME,DB_ID,LAST_ACCESS_TIME,OWNER,RETENTION,SD_ID,TBL_NAME,TBL_TYPE,VIEW_EXPANDED_TEXT,VIEW_ORIGINAL_TEXT,LINK_TARGET_ID
0,1,1450179061,1,0,nasdag,0,1,master,MANAGED_TABLE,,,
1,2,1450179063,1,0,nasdag,0,2,salaries,MANAGED_TABLE,,,


### Write into a csv hdfs file

In [9]:
df2.write.format("com.databricks.spark.csv").option("header","true").option("delimiter",";").save("test0_metastore_tbls.csv", mode="Overwrite")
!hdfs dfs -ls test0_metastore_tbls.csv
!hdfs dfs -cat test0_metastore_tbls.csv/*

Found 2 items
-rw-r--r--   1 nasdag supergroup          0 2015-12-15 03:34 test0_metastore_tbls.csv/_SUCCESS
-rw-r--r--   1 nasdag supergroup        266 2015-12-15 03:34 test0_metastore_tbls.csv/part-00000
TBL_ID;CREATE_TIME;DB_ID;LAST_ACCESS_TIME;OWNER;RETENTION;SD_ID;TBL_NAME;TBL_TYPE;VIEW_EXPANDED_TEXT;VIEW_ORIGINAL_TEXT;LINK_TARGET_ID
1;1450179061;1;0;nasdag;0;1;master;MANAGED_TABLE;null;null;null
2;1450179063;1;0;nasdag;0;2;salaries;MANAGED_TABLE;null;null;null


In [10]:
df3 = sqlContext.read.format("com.databricks.spark.csv").option("header","true").option("delimiter",";").load("test0_metastore_tbls.csv")
df3.registerTempTable("tbls")
sqlContext.sql("SELECT * FROM tbls").collect()

[Row(TBL_ID=u'1', CREATE_TIME=u'1450179061', DB_ID=u'1', LAST_ACCESS_TIME=u'0', OWNER=u'nasdag', RETENTION=u'0', SD_ID=u'1', TBL_NAME=u'master', TBL_TYPE=u'MANAGED_TABLE', VIEW_EXPANDED_TEXT=u'null', VIEW_ORIGINAL_TEXT=u'null', LINK_TARGET_ID=u'null'),
 Row(TBL_ID=u'2', CREATE_TIME=u'1450179063', DB_ID=u'1', LAST_ACCESS_TIME=u'0', OWNER=u'nasdag', RETENTION=u'0', SD_ID=u'2', TBL_NAME=u'salaries', TBL_TYPE=u'MANAGED_TABLE', VIEW_EXPANDED_TEXT=u'null', VIEW_ORIGINAL_TEXT=u'null', LINK_TARGET_ID=u'null')]

### and now parquet

In [11]:
df3.write.save("test0_metastore_tbls.parquet", format="parquet", mode="Overwrite")
!hdfs dfs -ls test0_metastore_tbls.parquet
df3.printSchema()

Found 5 items
-rw-r--r--   1 nasdag supergroup          0 2015-12-15 03:34 test0_metastore_tbls.parquet/_SUCCESS
-rw-r--r--   1 nasdag supergroup       1183 2015-12-15 03:34 test0_metastore_tbls.parquet/_common_metadata
-rw-r--r--   1 nasdag supergroup       2594 2015-12-15 03:34 test0_metastore_tbls.parquet/_metadata
-rw-r--r--   1 nasdag supergroup       1183 2015-12-15 03:34 test0_metastore_tbls.parquet/part-r-00000-fa6cee3f-77fc-47a8-b216-370103090f61.gz.parquet
-rw-r--r--   1 nasdag supergroup       2960 2015-12-15 03:34 test0_metastore_tbls.parquet/part-r-00001-fa6cee3f-77fc-47a8-b216-370103090f61.gz.parquet
root
 |-- TBL_ID: string (nullable = true)
 |-- CREATE_TIME: string (nullable = true)
 |-- DB_ID: string (nullable = true)
 |-- LAST_ACCESS_TIME: string (nullable = true)
 |-- OWNER: string (nullable = true)
 |-- RETENTION: string (nullable = true)
 |-- SD_ID: string (nullable = true)
 |-- TBL_NAME: string (nullable = true)
 |-- TBL_TYPE: string (nullable = true)
 |-- VIEW_EX

In [12]:
df2.write.save("test0_metastore_tbls.parquet", format="parquet", mode="Overwrite")
df4 = sqlContext.read.parquet("test0_metastore_tbls.parquet")
df4.printSchema()

root
 |-- TBL_ID: long (nullable = true)
 |-- CREATE_TIME: integer (nullable = true)
 |-- DB_ID: long (nullable = true)
 |-- LAST_ACCESS_TIME: integer (nullable = true)
 |-- OWNER: string (nullable = true)
 |-- RETENTION: integer (nullable = true)
 |-- SD_ID: long (nullable = true)
 |-- TBL_NAME: string (nullable = true)
 |-- TBL_TYPE: string (nullable = true)
 |-- VIEW_EXPANDED_TEXT: string (nullable = true)
 |-- VIEW_ORIGINAL_TEXT: string (nullable = true)
 |-- LINK_TARGET_ID: long (nullable = true)



In [13]:
!hdfs dfs -rm -r test0_metastore_tbls.csv
!hdfs dfs -rm -r test0_metastore_tbls.parquet

15/12/15 03:35:07 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted test0_metastore_tbls.csv
15/12/15 03:35:10 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted test0_metastore_tbls.parquet


### Connect through JDBC and HiveContext

In [14]:
import pyhs2

In [15]:
conn = pyhs2.connect(host='localhost',
                   port=10000,
                   authMechanism="PLAIN",
                   user='nasdag',
                   password='',
                   database='default')

In [16]:
cur = conn.cursor()
cur.execute("show tables")
pd.DataFrame(cur.fetchall())

Unnamed: 0,0,1
0,master,False
1,salaries,False


In [17]:
#cur = conn.cursor()
#cur.execute("drop table master")

In [18]:
cur = conn.cursor()
cur.execute("""CREATE TABLE IF NOT EXISTS Master
      (playerID STRING,
      birthYear INT,
      birthMonth INT,
      birthDay INT,
      birthCountry STRING,
      birthState STRING,
      birthCity STRING,
      deathYear INT,
      deathMonth INT,
      deathDay INT,
      deathCountry STRING,
      deathState STRING,
      deathCity STRING,
      nameFirst STRING,
      nameLast STRING,
      nameGiven STRING,
      weight INT,
      height INT,
      bats STRING,
      throws STRING,
      debut STRING,
      finalGame STRING,
      retroID STRING,
      bbrefID STRING)
      COMMENT 'Master Player Table'
      ROW FORMAT DELIMITED
      FIELDS TERMINATED BY ','
      STORED AS TEXTFILE""")

In [19]:
cur = conn.cursor()
cur.execute("describe master")
pd.DataFrame(cur.fetchall())

Unnamed: 0,0,1,2
0,playerid,string,
1,birthyear,int,
2,birthmonth,int,
3,birthday,int,
4,birthcountry,string,
5,birthstate,string,
6,birthcity,string,
7,deathyear,int,
8,deathmonth,int,
9,deathday,int,


In [20]:
cur = conn.cursor()
cur.execute("select * from master limit 5")
pd.DataFrame(cur.fetchall())

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,14,15,16,17,18,19,20,21,22,23
0,aardsda01,1981,12,27,USA,CO,Denver,,,,...,Aardsma,David Allan,205,75,R,R,2004-04-06,2013-09-28,aardd001,aardsda01
1,aaronha01,1934,2,5,USA,AL,Mobile,,,,...,Aaron,Henry Louis,180,72,R,R,1954-04-13,1976-10-03,aaroh101,aaronha01
2,aaronto01,1939,8,5,USA,AL,Mobile,1984.0,8.0,16.0,...,Aaron,Tommie Lee,190,75,R,R,1962-04-10,1971-09-26,aarot101,aaronto01
3,aasedo01,1954,9,8,USA,CA,Orange,,,,...,Aase,Donald William,190,75,R,R,1977-07-26,1990-10-03,aased001,aasedo01
4,abadan01,1972,8,25,USA,FL,Palm Beach,,,,...,Abad,Fausto Andres,184,73,L,L,2001-09-10,2006-04-13,abada001,abadan01


In [21]:
from pyspark.sql import HiveContext
sqlContext2 = HiveContext(sc)
sqlContext2.sql("show tables").collect()

[Row(tableName=u'master', isTemporary=False),
 Row(tableName=u'salaries', isTemporary=False)]

In [22]:
sqlContext2.sql("DESCRIBE Master").collect()

[Row(col_name=u'playerid', data_type=u'string', comment=None),
 Row(col_name=u'birthyear', data_type=u'int', comment=None),
 Row(col_name=u'birthmonth', data_type=u'int', comment=None),
 Row(col_name=u'birthday', data_type=u'int', comment=None),
 Row(col_name=u'birthcountry', data_type=u'string', comment=None),
 Row(col_name=u'birthstate', data_type=u'string', comment=None),
 Row(col_name=u'birthcity', data_type=u'string', comment=None),
 Row(col_name=u'deathyear', data_type=u'int', comment=None),
 Row(col_name=u'deathmonth', data_type=u'int', comment=None),
 Row(col_name=u'deathday', data_type=u'int', comment=None),
 Row(col_name=u'deathcountry', data_type=u'string', comment=None),
 Row(col_name=u'deathstate', data_type=u'string', comment=None),
 Row(col_name=u'deathcity', data_type=u'string', comment=None),
 Row(col_name=u'namefirst', data_type=u'string', comment=None),
 Row(col_name=u'namelast', data_type=u'string', comment=None),
 Row(col_name=u'namegiven', data_type=u'string', com

### not supported

In [23]:
sqlContext.read.format('jdbc').options(source="jdbc",driver="org.apache.hive.jdbc.HiveDriver",url='jdbc:hive2://localhost:10000/default?user=nasdag&password=', dbtable='master').load()

Py4JJavaError: An error occurred while calling o71.load.
: java.sql.SQLException: Method not supported
	at org.apache.hive.jdbc.HiveResultSetMetaData.isSigned(HiveResultSetMetaData.java:143)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:135)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.<init>(JDBCRelation.scala:91)
	at org.apache.spark.sql.execution.datasources.jdbc.DefaultSource.createRelation(DefaultSource.scala:60)
	at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:125)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:207)
	at java.lang.Thread.run(Thread.java:745)


In [24]:
sc.stop()