## To make the libraries you have uploaded in the Library Manager available in this Notebook, run the command below to get started

```run -i platform-libs/initialize.py```

In [1]:
run -i platform-libs/initialize.py

You are all set to explore!!!


In [2]:
from pipelineblocksdk.construct.base.StreamBlock import StreamBlock
from pipelineblocksdk.api.Singleton import Singleton
from pipelineblocksdk.data.spark.SparkConfCustom import SparkConfCustom
from pipelineblocksdk.util.async_util import async
from pipelineblocksdk.util.kerbUtil import generate_ticket_granting_ticket
from pipelineblocksdk.util.ThirdPartyIntegration import get_oracle_creds

from hdfs import InsecureClient
from hdfs.ext.kerberos import KerberosClient

from shutil import rmtree
from ast import literal_eval

import time
import uuid
import os
import requests
import json
import pathlib
import traceback
from threading import Thread


# Following code is the definition for a batch block

class MyBlock(StreamBlock, Singleton):
    base_temp_path = '/tmp/'
    temp_file_paths = []
    hdfs_client = None
    spark = None
    kafka_api_instance = None
    left_df = None
    right_df = None
    join_df = None
    output_topic = None
    part_files = None

    # This is the entry point for the block. This is a mandatory function.
    def run(self):
        t1 = time.time()
        output_dict = dict()
        self.logger.info('Run function says: SparkJoin')
        self.logger.info(f"Inputs available: {self.input_dict}")
        os.makedirs(self.base_temp_path, exist_ok=True)
        self.kafka_api_instance = self.data_handler.api_instance

        self.spark = SparkConfCustom(self.input_dict["SparkConf"]).get_spark_session()
        self.spark.sparkContext.setLogLevel('INFO')
        try:
            left_path, left_thread, right_path, right_thread = None, None, None, None
            if self.input_dict['LeftDataSource']['queueTopicName'] \
                    and len(self.input_dict['LeftDataSource']['queueTopicName']) > 3:
                left_path, left_thread = self.get_streaming_path(self.input_dict['LeftDataSource'])

            if self.input_dict['RightDataSource']['queueTopicName'] \
                    and len(self.input_dict['RightDataSource']['queueTopicName']) > 3:
                right_path, right_thread = self.get_streaming_path(self.input_dict['RightDataSource'])

            # Waiting for streams to download the data to local disk
            if left_thread:
                left_thread.join()

            if right_thread:
                right_thread.join()

            self.logger.info(self.input_dict['DataTarget']['stream'])

            if ('type' in self.input_dict['LeftDataSource'] and self.input_dict['LeftDataSource']['type'] == 'hdfs') \
                    or ('type' in self.input_dict['RightDataSource'] and self.input_dict['RightDataSource'][
                'type'] == 'hdfs'):
                self.hdfs_client = self.get_client(self.block_params, self.input_dict['ConnectionParams'])

            if (self.input_dict['DataTarget']['stream'] is not True) :
                self.hdfs_client = self.get_client(self.block_params, self.input_dict['ConnectionParams'])

            self.left_df = self.get_df(self.spark, self.input_dict['LeftDataSource'], self.block_params, left_path)
            self.logger.info(str(self.left_df.schema.json()))
            self.right_df = self.get_df(self.spark, self.input_dict['RightDataSource'], self.block_params, right_path)
            self.logger.info(str(self.right_df.schema.json()))
            self.spark.sparkContext.setLogLevel("INFO")

            self.right_df.show()

            self.left_df.createOrReplaceTempView('left')
            self.right_df.createOrReplaceTempView('right')

            sql_query = self.input_dict['DataTarget']['join_query']
            self.logger.info(f"Query is {sql_query}")
            self.join_df = self.spark.sql(sql_query)

            # Rename duplicate columns
            col_list = self.join_df.columns
            for col in col_list:
                count = col_list.count(col)
                if count > 1:
                    idx = col_list.index(col)
                    col_list[idx] = col + '_1'
            self.logger.info(self.join_df.columns)
            self.join_df = self.join_df.toDF(*col_list)
            self.logger.info(self.join_df.columns)


            # Write df to HDFS or write to file
            join_time_st = time.time()
            if self.input_dict['ConnectionParams']['kerberos'] == 'false' \
                    and str(self.input_dict['DataTarget']['filePath']).endswith('.parquet'):
                # Write to HDFS
                exists = self.file_exits(self.hdfs_client, self.input_dict['DataTarget']['filePath'])
                if exists:
                    if self.input_dict['DataTarget']['overwrite'] is True:
                        # remove file
                        self.delete_file(self.hdfs_client, self.input_dict['DataTarget']['filePath'])
                    else:
                        raise FileExistsError("File Already Exists: " + str(self.input_dict['DataTarget']['filePath']))
                host = self.input_dict['ConnectionParams']['hostName']
                port = self.input_dict['ConnectionParams']['port']
                target_path = self.input_dict['DataTarget']['filePath']
                protocol = 'https://' if self.input_dict['ConnectionParams']['https'] == 'true' else 'http://'
                hdfs_path = protocol + host + port + '/' + target_path
                self.join_df.write.mode("overwrite").option("header", "true").csv(hdfs_path)
                join_time_end = time.time()
            else:
                # Write to file and upload to HDFS
                temp_fp = self.base_temp_path + str(t1) + '.csv'
                self.join_df.write.mode("overwrite").option("header", "false").csv(temp_fp)
                join_time_end = time.time()
                # combine to single file
                combine_start_time = time.time()
                files = list(pathlib.Path(temp_fp).glob('*.csv'))
                files.sort()
                if self.input_dict['DataTarget']['stream'] == 'true' or self.input_dict['DataTarget']['stream'] is True:
                    self.output_topic, self.producer = self.data_handler.create_producer(str(uuid.uuid4()))
                    self.part_files = files
                    self.stream()
                    output_dict["queueTopicName"] = self.output_topic
                    return output_dict

                exists = self.file_exits(self.hdfs_client, self.input_dict['DataTarget']['filePath'])
                if exists:
                    if self.input_dict['DataTarget']['overwrite'] is True:
                        # remove file
                        self.delete_file(self.hdfs_client, self.input_dict['DataTarget']['filePath'])
                    else:
                        raise FileExistsError("File Already Exists: " + str(self.input_dict['DataTarget']['filePath']))
                single_out_file = self.base_temp_path + str(combine_start_time) + '.csv'
                self.temp_file_paths.append(single_out_file)
                with open(single_out_file, 'a') as out_file:
                    out_file.write(",".join(self.join_df.columns))
                    for f in files:
                        self.logger.info(f'{f} appended to single csv')
                        with open(str(f), 'r') as in_file:
                            for line in in_file:
                                if len(line)>1:
                                    out_file.write('\n' + line)
                            self.logger.info(f'Last line is: {line}')

                print(
                    f'Done with combining part files to a single file.\n Time taken: {time.time() - combine_start_time}')

                write_start_time = time.time()
                self.logger.info("Writing to HDFS:")
                self.block_folder_write(self.hdfs_client, single_out_file, self.input_dict['DataTarget']['filePath']
                                        , self.input_dict['DataTarget']['overwrite'])
                self.logger.info("Time taken to write to HDFS: " + str(time.time() - write_start_time))
                self.logger.info('Time for Join: ' + str(join_time_end - join_time_st))
        except Exception as e:
            some_track = traceback.format_exc()
            self.logger.error(f"Error: {str(e)} \n trace: {some_track}")
            self.logger.error(e)
            raise e

        self.clean_temp_files()
        # Set the output parameter
        output_dict['file_path'] = self.input_dict['DataTarget']['filePath']
        # Set the status of the block as completed
        self.block_status = "COMPLETED"

        return output_dict

    def clean_temp_files(self):
        self.logger.info("Cleaning temp files:")
        for file in self.temp_file_paths:
            rmtree(path=file, ignore_errors=True)

    # HDFS Client
    def get_client(self, block_params=None, connection_params=None):
        try:
            kerb_auth = False
            method = "https"

            if "https" in connection_params:
                if connection_params["https"]:
                    method = "https"
                else:
                    method = "http"

            host_name = connection_params["hostName"]
            port = connection_params["port"]

            if 'kerberos' in connection_params:
                kerb_auth = bool(connection_params['kerberos'])

            if kerb_auth:
                principal = generate_ticket_granting_ticket(block_params, connection_params["authName"])
                session = requests.Session()
                session.verify = False
                full_host = "%s://%s:%s" % (method, host_name, port)
                client = KerberosClient(url=full_host, session=session, mutual_auth='OPTIONAL', principal=principal)
                client.list('/')
                return client
            else:
                hadoop_host = host_name + ":" + port
                client = InsecureClient("http://" + hadoop_host)
                client.list('/')
                return client
        except Exception as e:
            self.logger.error("Error Occurred While Connecting to HDFS With Given Connection Details")
            raise e

    def get_df(self, spark_session, data_source, block_params, path):
        if data_source['queueTopicName'] and len(data_source['queueTopicName']) > 3:
            return self.get_streaming_df(spark_session, path)
        elif data_source['type'] == 'hdfs':
            return self.get_hdfs_df(spark_session, data_source, self.hdfs_client)
        elif data_source['type'] == 'oracle':
            return self.get_oracle_df(spark_session, data_source, block_params)
        else:
            raise Exception('please provide a valid type')

    def get_hdfs_df(self, spark_session, data_source, hdfs_connection):
        self.logger.info("Reading HDFS file to local")
        local_file_path = self.base_temp_path + str(time.time()) + '.csv'
        hdfs_connection.download(data_source['fileWithFullPath'], local_file_path, n_threads=-1,
                                 chunk_size=5000000, overwrite=True)
        self.logger.info("Creating dataframe from HDFS")
        self.temp_file_paths.append(local_file_path)
        return spark_session.read.format("csv").option("header", data_source['header']) \
            .option("inferSchema", "true").option("delimiter", data_source['delimiter']) \
            .load(local_file_path)

    def get_oracle_df(self, spark_session, data_source, block_params):
        self.logger.info("Creating Oracle dataframe")
        credentials = get_oracle_creds(user_id=block_params["userAuthToken"],
                                       authentication_name=data_source['connection_name'])
        url = "jdbc:oracle:thin:@%s:%s:%s" % (credentials["host"], credentials["port"], credentials["sid"])
        return spark_session.read.format("jdbc") \
            .options(url=url
                     , driver="oracle.jdbc.driver.OracleDriver"
                     , dbtable=data_source['query']
                     , fetchSize=1000000
                     , user=credentials["username"]
                     , password=credentials["password"]).load()

    def get_streaming_path(self, data_source):
        topic_name = data_source['queueTopicName']
        consumer_pool = {
            "count": 1,
            "groupId": str(uuid.uuid4()),
            "registerId": "",
            "topicsListToSubscribe": [
                topic_name
            ]
        }
        try:
            consumer_pool_res = self.kafka_api_instance.create_consumer_list_using_post(consumer_pool)
            channel = consumer_pool_res.result
        except Exception as e:
            self.logger.error("Error Trying To Create a Consumer Of Topic:" + str(topic_name))
            self.block_status = "FAILED"
            raise e
        req = {"topicName": topic_name}
        schema = self.kafka_api_instance.get_topic_meta_using_post(req)
        schema = json.loads(json.loads(schema.result)["schema"])
        self.logger.info(f"Schema: {schema}")

        f_path = self.base_temp_path + str(time.time())
        if os.path.exists(f_path):
            rmtree(f_path)
        read_stream_thread = Thread(target=self.read_records,
                                    args=(topic_name, schema, self.kafka_api_instance, f_path, channel, self.logger))
        read_stream_thread.start()
        self.temp_file_paths.append(f_path)
        return f_path, read_stream_thread

    def get_streaming_df(self, spark, path):
        return spark.read.format("csv").option("header", "true") \
            .option("inferSchema", "true").option("delimiter", ",") \
            .load(path)

    def read_records(self, topic, schema, kafka_api_instance, file_path=None, channel=None, logger=None):
        try:
            logger.info(f'Started reading topic {topic}')
            read_msgs_channel = {
                "channelId": channel,
                "registerId": ""
            }
            t_records = 0
            with open(file_path, 'a', buffering=50 * (1024 ** 2)) as writer:
                writer.write(",".join(schema.keys()))
                while True:
                    read_msgs_res = kafka_api_instance.read_messages_from_topic_using_post(read_msgs_channel)
                    msgs = read_msgs_res.result
                    logger.info(f'Read: {str(len(msgs))}, Total: {t_records}')

                    if len(msgs) == 0:
                        logger.info('Zero Messages')
                        topic = {"topicName": topic}
                        res = kafka_api_instance.get_producer_status_using_post(topic)
                        logger.info('Zero messages: ' + json.dumps(res.result))
                        if not res.result['value']:
                            break
                    # logger.info(f'message(0): {msgs[0]}')
                    for msg in msgs:
                        t_records = t_records + 1
                        writer.write('\n')
                        my_string = ','.join(map(str, literal_eval(msg)))
                        writer.write(my_string)

            logger.info(f'Done writing topic: {topic} to file: {file_path} Records: {t_records}')
        except Exception as e:
            logger.error(e)

    def file_exits(self, hdfs_connection, file_path):
        self.logger.debug("Inside the file exists check method")
        try:
            return hdfs_connection.status(hdfs_path=file_path, strict=True)
        except Exception as e:
            return False

    def delete_file(self, hdfs_connection, file_path):
        self.logger.debug("Inside delete HDFS file method")
        try:
            return hdfs_connection.delete(file_path)
        except Exception as e:
            return False

    def block_folder_write(self, hdfs_connection, local_file_path, upload_path, overwrite: bool):
        self.logger.info("Inside the write method")
        hdfs_connection.upload(upload_path, local_file_path, n_threads=4,
                               chunk_size=5000000, cleanup=True, overwrite=overwrite)
        self.logger.info("Done writing")

    @async
    def stream(self):
        print('Stream function says: Hello, world!')
        try:
            schema = self.get_df_schema(self.join_df)
            print(f"Join Schema: {schema}")
            self.set_schema(self.output_topic, schema)
            acc_counter = self.spark.sparkContext.accumulator(0)
            # kafka method
            #
            # def do_count(x):
            #     acc_counter.add(1)
            #     return x
            # adf_count = self.join_df.rdd.map(do_count).toDF()
            # adf_count.selectExpr("to_json(struct(*)) AS value") \
            #     .write \
            #     .format("kafka") \
            #     .option("kafka.bootstrap.servers", os.environ['KAFKA_LIST']) \
            #     .option("topic", self.output_topic) \
            #     .save()

            ###  Normal forEach part

            # def customFunction(rows):
            #         for row in rows:
            #             acc_counter.add(1)
            #             my_list = list(row.asDict().values())
            #             # csv_str = ",".join(str(item) for item in my_list)
            #             self.producer.send(my_list)
            #
            # self.join_df.rdd.foreachPartition(customFunction)

            ### using files
            print(f"Partfiles = {self.part_files}")
            for f in self.part_files:
                self.logger.info(f'{f} appending to stream\n')
                with open(str(f), 'r') as in_file:
                    for line in in_file:
                        acc_counter.add(1)
                        self.producer.send(line.split(","))

            total_records = acc_counter.value
            self.logger.info(f"Total records: {total_records}")

            # Update meta to KML
            temp_error = {
                "noOfIgnoredRecords": 0,
                "errorRecords": 0
            }
            ui_info = {"info": "", "error": ""}
            meta_data = {
                "schema": json.dumps(schema),
                "readerInfo": json.dumps({"noOfRecordsRead": total_records}),
                "readerInfoError": json.dumps(temp_error),
                "ui_info": json.dumps(ui_info)
            }

            self.set_meta_data(self.output_topic, meta_data=meta_data)
            self.logger.info(f"Total records read: {str(total_records)}")
        except Exception as e:
            self.block_status = "FAILED"
            some_track = traceback.format_exc()
            self.logger.error(f"Error: {str(e)} \n {str(some_track)}")
            print(f"Error: {str(e)} \n {str(some_track)}")
            raise e
        finally:
            self.producer.close()

        self.clean_temp_files()

    def get_df_schema(self, df):
        try:
            json_schema = df.schema.json()
            json_schema = json.loads(json_schema)

            b2s_dict = {}

            for i, val in enumerate(json_schema['fields']):
                if val['type'] == 'integer':
                    b2s_dict[val['name'].upper()] = {'order': i + 1, 'active': True, 'type': 'IntegerType()'}
                if val['type'] == 'string':
                    b2s_dict[val['name'].upper()] = {'order': i + 1, 'active': True, 'type': 'StringType()'}
                if val['type'] == 'float' or val['type'] == 'double':
                    b2s_dict[val['name'].upper()] = {'order': i + 1, 'active': True, 'type': 'FloatType()'}
            return b2s_dict
        except Exception as e:
            raise e





[DEBUG]:  Adding config - Reader Host = 172.16.109.117:9998
[DEBUG]:  Adding config - Recipe Builder Host = 172.16.109.117:8052
[DEBUG]:  Config file /home/jovyan/work/data/config_notebook.json found! SDK configured.
[DEBUG]:  Using configuration: 
{
    "ADMIN_MODULE": {
        "IP": "172.16.109.117",
        "PORT": "9101"
    },
    "AUTH_TOKEN_ENDPOINTS": {
        "ADMIN_APP_URL": "http://172.16.109.117:9101/"
    },
    "CONSOLE_LOGGING_ENABLED": true,
    "CONTROLLER_LOG_PATH": "/bigbrain/data/logs/",
    "CONTROLLER_PORT": 9094,
    "DEV_MODE": true,
    "GRAPH_APP_ID": "engine",
    "GRAPH_SCHEMA": {
        "LINE_GRAPH": "LINE_GRAPH_v1.1"
    },
    "GRAYLOG_ENABLED": false,
    "JAVA_CONTROLLER_PATH": "/bigbrain/data/java-controller/java-controller-3.0-SNAPSHOT-jar-with-dependencies.jar",
    "KAFKA_BROKER": {
        "HOSTS": [
            "172.16.109.117:9092"
        ]
    },
    "KAFKA_MANAGER_HOST": "172.16.109.117",
    "KAFKA_MANAGER_PORT": 8088,
    "LOGGING_SERVER_

In [3]:
join_blk = MyBlock()

bp = {
    'userAuthToken': '6d72d2cf-45b6-464c-9811-d6cf21574ece'
}

ip = {
    "LeftDataSource": {
      "connection_name": "ora_con",
      "query": "(SELECT * from tables)",
      "type": "oracle",
      "fileWithFullPath": "/data/hdfs-nfs/Data/6d72d2cf-45b6-464c-9811-d6cf21574ece/100000SalesRecords1.csv",
      "header": True,
      "delimiter": ",",
      "queueTopicName": ""
    },
    "RightDataSource": {
      "connection_name": "",
      "query": "",
      "type": "hdfs",
      "fileWithFullPath": "/data/hdfs-nfs/Data/6d72d2cf-45b6-464c-9811-d6cf21574ece/100000SalesRecords1.csv",
      "header": True,
      "delimiter": ",",
      "queueTopicName": ""
    },
    "DataTarget":{
        "join_query": "SELECT * from left l JOIN right r ON l.`Order ID`=r.`Order ID`",
        "stream": False,
        "filePath": "/data/test_phani/join_out_note.csv",
        "delimiter": ",",
        "overwrite": True
    },
    "ConnectionParams":{
        
        "hostName": "172.16.109.117",
        "port": "9870",
        "kerberos": False,
        "authName": "",
        "https": False

    },
    "SparkConf":{
      "spark.memory.fraction": 0.9,
      "spark.storage.memoryFraction": 0.3,
      "spark.jars":"/home/jovyan/work/data/spark-libs/ojdbc8-12.2.0.1.jar"
    }
}

join_blk.set_params(ip,bp)

[DEBUG]:  Message Handler Created
[INFO]:  Set Params Complete


In [4]:
join_blk.run()

[INFO]:  Run function says: SparkJoin
[INFO]:  Inputs available: {'LeftDataSource': {'connection_name': 'ora_con', 'query': '(SELECT * from tables)', 'type': 'oracle', 'fileWithFullPath': '/data/hdfs-nfs/Data/6d72d2cf-45b6-464c-9811-d6cf21574ece/100000SalesRecords1.csv', 'header': True, 'delimiter': ',', 'queueTopicName': ''}, 'RightDataSource': {'connection_name': '', 'query': '', 'type': 'hdfs', 'fileWithFullPath': '/data/hdfs-nfs/Data/6d72d2cf-45b6-464c-9811-d6cf21574ece/100000SalesRecords1.csv', 'header': True, 'delimiter': ',', 'queueTopicName': ''}, 'DataTarget': {'join_query': 'SELECT * from left l JOIN right r ON l.`Order ID`=r.`Order ID`', 'stream': False, 'filePath': '/data/test_phani/join_out_note.csv', 'delimiter': ',', 'overwrite': True}, 'ConnectionParams': {'hostName': '172.16.109.117', 'port': '9870', 'kerberos': False, 'authName': '', 'https': False}, 'SparkConf': {'spark.memory.fraction': 0.9, 'spark.storage.memoryFraction': 0.3, 'spark.jars': '/home/jovyan/work/data/

Py4JJavaError: An error occurred while calling o73.load.
: java.sql.SQLSyntaxErrorException: ORA-00942: table or view does not exist

	at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:450)
	at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:399)
	at oracle.jdbc.driver.T4C8Oall.processError(T4C8Oall.java:1059)
	at oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:522)
	at oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:257)
	at oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:587)
	at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:225)
	at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:53)
	at oracle.jdbc.driver.T4CPreparedStatement.executeForDescribe(T4CPreparedStatement.java:774)
	at oracle.jdbc.driver.OracleStatement.executeMaybeDescribe(OracleStatement.java:925)
	at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1111)
	at oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:4798)
	at oracle.jdbc.driver.OraclePreparedStatement.executeQuery(OraclePreparedStatement.java:4845)
	at oracle.jdbc.driver.OraclePreparedStatementWrapper.executeQuery(OraclePreparedStatementWrapper.java:1501)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:60)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.<init>(JDBCRelation.scala:115)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:52)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:340)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [24]:
join_blk.join_df.columns

['Region_1',
 'Country_1',
 'Item Type_1',
 'Sales Channel_1',
 'Order Priority_1',
 'Order Date_1',
 'Order ID_1',
 'Ship Date_1',
 'Units Sold_1',
 'Unit Price_1',
 'Unit Cost_1',
 'Total Revenue_1',
 'Total Cost_1',
 'Total Profit_1',
 'Region',
 'Country',
 'Item Type',
 'Sales Channel',
 'Order Priority',
 'Order Date',
 'Order ID',
 'Ship Date',
 'Units Sold',
 'Unit Price',
 'Unit Cost',
 'Total Revenue',
 'Total Cost',
 'Total Profit']

In [29]:
ls /home/jovyan/work/data/spark-libs/ojdbc8-12.2.0.1.jar

 kafka-clients-2.4.0.jar  'ojdbc8-12.2.0.1 (1).jar'
 ojdbc6-11.2.0.3.jar       ojdbc8-12.2.0.1.jar
 ojdbc6.jar                spark-sql-kafka-0-10_2.11-2.4.5.jar
 ojdbc7.jar                spark-sql-kafka-0-10_2.12-2.4.5.jar


In [5]:
import os
os.environ


environ{'LC_ALL': 'en_US.UTF-8',
        'JUPYTERHUB_CLIENT_ID': 'jupyterhub-user-6d72d2cf-45b6-464c-9811-d6cf21574ece',
        'LANG': 'en_US.UTF-8',
        'HOSTNAME': 'bb2f5fb24eb0',
        'NB_UID': '1000',
        'CONDA_DIR': '/opt/conda',
        'CPU_LIMIT': '2.0',
        'JUPYTERHUB_BASE_URL': '/',
        'MESOS_NATIVE_LIBRARY': '/usr/local/lib/libmesos.so',
        'PWD': '/home/jovyan',
        'HOME': '/home/jovyan',
        'BB_EXECUTION': 'JUPYTER',
        'JUPYTERHUB_USER': '6d72d2cf-45b6-464c-9811-d6cf21574ece',
        'DEBIAN_FRONTEND': 'noninteractive',
        'SPARK_HOME': '/usr/local/spark',
        'NB_USER': 'jovyan',
        'HADOOP_VERSION': '2.7',
        'JUPYTERHUB_SERVICE_PREFIX': '/user/6d72d2cf-45b6-464c-9811-d6cf21574ece/',
        'SHELL': '/bin/bash',
        'SPARK_OPTS': '--driver-java-options=-Xms1024M --driver-java-options=-Xmx4096M --driver-java-options=-Dlog4j.logLevel=info',
        'MEM_LIMIT': '2147483648',
        'APACHE_SPARK_VERSION

In [None]:
def stream(self):
    print('Stream function says: Hello, world!')
    try:
        schema = self.get_df_schema(self.df)
        print(f"Schema: {schema}")
        self.set_schema(self.output_topic, schema)
        acc_counter = self.spark.sparkContext.accumulator(0)
        # Write dataframe to files
        self.join_df.write.mode("overwrite").option("header", "false").csv(temp_fp)
        part_files = list(pathlib.Path(temp_fp).glob('*.csv'))
        part_files.sort()

        # if possible do above steps in run method
        # Using files
        print(f"Partfiles = {self.part_files}")
        for f in self.part_files:
            self.logger.info(f'{f} appending to stream\n')
            with open(str(f), 'r') as in_file:
                for line in in_file:
                    acc_counter.add(1)
                    self.producer.send(line.split(","))

        total_records = acc_counter.value
        self.logger.info(f"Total records: {total_records}")

        # Update meta to KML
        temp_error = {
            "noOfIgnoredRecords": 0,
            "errorRecords": 0
        }
        ui_info = {"info": "", "error": ""}
        meta_data = {
            "schema": json.dumps(schema),
            "readerInfo": json.dumps({"noOfRecordsRead": total_records}),
            "readerInfoError": json.dumps(temp_error),
            "ui_info": json.dumps(ui_info)
        }

        self.set_meta_data(self.output_topic, meta_data=meta_data)
        self.logger.info(f"Total records read: {str(total_records)}")
    except Exception as e:
        self.block_status = "FAILED"
        some_track = traceback.format_exc()
        self.logger.error(f"Error: {str(e)} \n {str(some_track)}")
        print(f"Error: {str(e)} \n {str(some_track)}")
        raise e
    finally:
        self.producer.close()

    self.clean_temp_files()

def get_df_schema(self, df):
    try:
        json_schema = df.schema.json()
        json_schema = json.loads(json_schema)

        b2s_dict = {}

        for i, val in enumerate(json_schema['fields']):
            if val['type'] == 'integer':
                b2s_dict[val['name'].upper()] = {'order': i + 1, 'active': True, 'type': 'IntegerType()'}
            if val['type'] == 'string':
                b2s_dict[val['name'].upper()] = {'order': i + 1, 'active': True, 'type': 'StringType()'}
            if val['type'] == 'float' or val['type'] == 'double':
                b2s_dict[val['name'].upper()] = {'order': i + 1, 'active': True, 'type': 'FloatType()'}
        return b2s_dict
    except Exception as e:
        raise e