In [1]:
#! /usr/bin/env python
# -*- coding: utf-8 -*-

from pyspark import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
import pandas as pd


SparkContext.getOrCreate(SparkConf())
spark = SparkSession.builder.master('yarn').enableHiveSupport().getOrCreate()
sc = SparkContext.getOrCreate(SparkConf())
sqlContext = SQLContext(sc)

pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

spark.conf.set("spark.sql.session.timezone", "GMT")
spark.conf.set("spark.sql.execution.arrow", "GMT")

In [2]:
pd_f = pd.read_csv('from_bda2.csv', header = 0, sep = ';')
mySchema = StructType([ StructField("RATE_TO_CRNC_GID", StringType(), True),\
                        StructField("RATE_FROM_CRNC_GID", StringType(), True),
                        StructField("RATE_VALUE", FloatType(), True),
                        StructField("RATE_ROW_STATUS", StringType(), True),
                        StructField("RATE_VALID_BEGIN", StringType(), True),
                        StructField("RATE_VALID_END", StringType(), True)]
    )

#pd_f['RATE_VALID_BEGIN'] = pd.to_datetime(pd_f['RATE_VALID_BEGIN'])
#pd_f['RATE_VALID_END'] = pd.to_datetime(pd_f['RATE_VALID_END'])

rates = spark.createDataFrame(pd_f, schema = mySchema)
rates.createOrReplaceTempView("rates")


In [3]:
rates.show()

+----------------+------------------+------------+---------------+-------------------+-------------------+
|RATE_TO_CRNC_GID|RATE_FROM_CRNC_GID|  RATE_VALUE|RATE_ROW_STATUS|   RATE_VALID_BEGIN|     RATE_VALID_END|
+----------------+------------------+------------+---------------+-------------------+-------------------+
|             JPY|               BYR|0.0061831963|              A|2016-12-30 00:00:00|2016-12-30 00:00:00|
|             KGS|               BYR|0.0027317414|              A|2016-12-30 00:00:00|2016-12-30 00:00:00|
|             KRW|               BYR| 0.014224724|              A|2016-12-30 00:00:00|2016-12-30 00:00:00|
|             KZT|               BYR| 1.832138E-4|              A|2016-12-30 00:00:00|2016-12-30 00:00:00|
|             MDL|               BYR| 0.006523749|              A|2016-12-30 00:00:00|2016-12-30 00:00:00|
|             MTL|               BYR|  1.36147E-4|              A|2016-12-30 00:00:00|2016-12-30 00:00:00|
+----------------+------------------+

In [4]:
rates_new = rates.select(
    trim(col("RATE_TO_CRNC_GID")).alias("RATE_TO_CRNC_GID"),
    trim(col("RATE_FROM_CRNC_GID")).alias("RATE_FROM_CRNC_GID"),
    col("RATE_VALUE").cast(DoubleType()).alias("RATE_VALUE"),
    col("RATE_ROW_STATUS"),
    to_timestamp(col("RATE_VALID_BEGIN")).alias("RATE_VALID_BEGIN"),
    to_timestamp(col("RATE_VALID_END")).alias("RATE_VALID_END"),
    lit(100).alias("some_number"),
    to_timestamp(lit("2020-01-01")).alias("some_date")
    )

In [5]:
rates_new.show()

+----------------+------------------+--------------------+---------------+-------------------+-------------------+-----------+-------------------+
|RATE_TO_CRNC_GID|RATE_FROM_CRNC_GID|          RATE_VALUE|RATE_ROW_STATUS|   RATE_VALID_BEGIN|     RATE_VALID_END|some_number|          some_date|
+----------------+------------------+--------------------+---------------+-------------------+-------------------+-----------+-------------------+
|             JPY|               BYR|0.006183196324855089|              A|2016-12-30 00:00:00|2016-12-30 00:00:00|        100|2020-01-01 00:00:00|
|             KGS|               BYR|0.002731741406023...|              A|2016-12-30 00:00:00|2016-12-30 00:00:00|        100|2020-01-01 00:00:00|
|             KRW|               BYR|0.014224723912775517|              A|2016-12-30 00:00:00|2016-12-30 00:00:00|        100|2020-01-01 00:00:00|
|             KZT|               BYR|1.832137932069599...|              A|2016-12-30 00:00:00|2016-12-30 00:00:00|    