# This notebook is to help automatically import csv file to hive

Below is import of all needed dependencies. And in this sell you should pass path where parquet files located. 

In [6]:
import os

Here is creating of spark context with hive support.

In [7]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Import parquet schema to hive").config("hive.metastore.uris", "thrift://hive:9083").enableHiveSupport().getOrCreate()

Define function below for getting sql script needed for creating table in hive using dataframe types as columns to table

In [8]:
def getCreateTableScriptCSV(databaseName, tableName, path, df):
    cols = df.dtypes
    createScript = "CREATE EXTERNAL TABLE " + databaseName + "." + tableName + "("
    colArray = []
    for colName, colType in cols:
        colArray.append(colName.replace(" ", "_") + " " + colType)
    createColsScript =   ", ".join(colArray )
    
    script = createScript + createColsScript + ") ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '" + path + "' TBLPROPERTIES('skip.header.line.count'='1') "
    print (script)
    return script

In [9]:
#define main function for creating table where arqument 'path' is path to parquet files 
def createTableCSV(databaseName, tableName, path): 
    df = spark.read.format("csv").option("header", "true").option("inferschema","true").load(path)
    sqlScript = getCreateTableScriptCSV(databaseName, tableName, path, df)
    spark.sql(sqlScript)

## One file example

In [10]:
# Set path where the csv file located.
my_csv_file_path = os.path.join('v3io://users/admin/examples/demo.csv')
createTableCSV("test","csv_table",my_csv_file_path)

AnalysisException: 'java.lang.RuntimeException: java.lang.SecurityException: V3IO error in UserContext(FileStatus(/tmp/hive/iguazio/8796a008-d3ec-4315-bb03-6b51740814dd,Stream(SIZE, MODE, ACCESS_TIME, MODIFY_TIME, CREATE_TIME, USER_ID, GROUP_ID))): PERMISSION_DENIED(-13) - {"ErrorCode": -13,"ErrorMessage": "Permission denied"};'

## One folder example for spark output job

In [88]:
# Set path where parquet folder with csv files inside located.
folder_path = os.path.join('v3io://users/admin/examples/csvs/')
createTableCSV("test","table_from_dir2",folder_path)

CREATE EXTERNAL TABLE test.table_from_dir2(id int, street string, city string, zip int, state string, beds int, baths int, sq__ft int, type string, sale_date string, price int, latitude double, longitude double) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'v3io://users/admin/examples/csvs/' TBLPROPERTIES('skip.header.line.count'='1') 


# Multiple files and folders example

Write here name of database and path to folder where all csv's files (or folders with them) located. Database should be created.
In this cell code goes over all files and dirs in provided path and using them for creating table.
File should be ended with .csv format and should be "," seperated.
Directory (in which stored csv files) should be started with "."
Name of directory or file will be name of table.

In [89]:
databaseName = "test"
filepath = "/v3io/users/admin/examples/csvs"

for fileOrDir in os.listdir(filepath):
    if fileOrDir.endswith(".csv") :
        createTableCSV(databaseName, fileOrDir.split(".csv")[0], filepath.replace("/v3io/", "v3io://", 1) + "/" + fileOrDir)
    elif not fileOrDir.startswith(".") :
        createTableCSV(databaseName, fileOrDir, filepath.replace("/v3io/", "v3io://", 1) + "/" + fileOrDir + "/*")



CREATE EXTERNAL TABLE test.demo1(id int, street string, city string, zip int, state string, beds int, baths int, sq__ft int, type string, sale_date string, price int, latitude double, longitude double) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'v3io://users/admin/examples/csvs/demo1.csv' TBLPROPERTIES('skip.header.line.count'='1') 
CREATE EXTERNAL TABLE test.demo2(id int, street string, city string, zip int, state string, beds int, baths int, sq__ft int, type string, sale_date string, price int, latitude double, longitude double) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'v3io://users/admin/examples/csvs/demo2.csv' TBLPROPERTIES('skip.header.line.count'='1') 


# Test how it works

In [11]:
# test how the tables were saved
#spark.sql("drop database test CASCADE")
spark.sql("drop table " + databaseName + ".example1")
spark.sql("show databases").show()
spark.sql("show tables in " + databaseName).show()

+------------+
|databaseName|
+------------+
|     default|
|        test|
+------------+

+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
|    test|                dir1|      false|
|    test|      table_from_dir|      false|
|    test|     table_from_dir2|      false|
|    test|table_from_single...|      false|
|    test|table_from_single...|      false|
|    test|           userdata1|      false|
|    test|           userdata2|      false|
|    test|           userdata3|      false|
+--------+--------------------+-----------+



In [None]:
# test how saving to table works
tableName = "example1"
spark.sql("select * from " + databaseName + "." + tableName)