In [1]:
from datetime import datetime


from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, BooleanType, ArrayType, NumericType
from pyspark.sql import functions as F
import unittest
from etl.conversation_guid_ref_etl import ConversationGuidRef



class SparkETLTestCase(unittest.TestCase):

    @classmethod
    def setUpClass(cls):
        cls.spark = SparkSession \
            .builder \
            .appName("conversation_guid_ref_Logic_unit_test") \
            .getOrCreate()

    @classmethod
    def tearDownClass(cls):
        cls.spark.stop()

    def test_transform_conversation_web_traffic(self):
        input_schema_msg_cus= StructType([
            StructField('conversation_id', StringType(), True),
            StructField('conversation_start_utc_ts', StringType(), True),
            StructField('conversation_end_utc_ts', StringType(), True),
            StructField('conversation_end_utc_date', StringType(), True),
            StructField('visit_guids', ArrayType() , True),
        ])

        input_msg_cus = [
            ('116f7d45-f027-41f4-aaa7-4652122d7c1e', '2019-10-09 20:28:56.558', '2019-10-09 20:29:49.716', ['2b6315d4-1254-5b15-b6f7-1f6570893854',' 31a8dd63-92ae-4922-8d0a-70a063e95142']),
            ('b18b621b-880a-487a-ada2-27e98e7d91b5', '2019-10-09 20:28:56.558', '2019-10-09 20:29:49.716', ['3762e72e-218e-5076-aed0-bb95558686fd']),
            ('0457bec0-2d97-4e50-85d8-490f16c3659d', '2019-10-09 20:28:56.558', '2019-10-09 20:29:49.716', ['21117d4f-3f13-5429-9d98-dcce83a889a7']),
            ('3507506b-78f5-4179-b603-1740c5d24b2e', '2019-10-09 20:28:56.558', '2019-10-09 20:29:49.716', None),
            ('hjsghsjkdd28378291', '2019-10-09 20:28:56.558', '2019-10-09 20:29:49.716', None),
        ]

        input_schema_msg = StructType([
            StructField('conversation_id', StringType(), True),
            StructField('account_id', NumericType(), True),
            StructField('visitor_app_id', StringType(), True),
            StructField('conversation_source', StringType(), True),
            StructField('visitor_device_type', StringType(), True),
        ])

        input_msg = [
            ('116f7d45-f027-41f4-aaa7-4652122d7c1e', 30187337, 'twilio', 'SMS', None),
            ('b18b621b-880a-487a-ada2-27e98e7d91b5', 30187337, 'twilio', 'SMS', None),
            ('3507506b-78f5-4179-b603-1740c5d24b2e', 30187337, 'twilio', 'SMS', 'Desktop'),
            ('0457bec0-2d97-4e50-85d8-490f16c3659d', 30187337, 'twilio', 'SMS', 'Desktop'),
            ('ASVSJJS', 128732, 'twilio', 'SMS', 'Desktop')
        ]

        msg_cus_df = self.spark.createDataFrame(data=input_msg_cus, schema=input_schema_msg_cus)
        msg_df = self.spark.createDataFrame(data=input_msg, schema=input_schema_msg)
        
        expected_schema = StructType([
            StructField('parent_contact_id', StringType(), True),
            StructField('conversation_start_utc_ts', StringType(), True),
            StructField('conversation_end_utc_ts', StringType(), True),
            StructField('conversation_end_utc_date', StringType(), True),
            StructField('visit_guids', ArrayType(), True),
            StructField('app_id', NumericType(), True),
            StructField('visitor_app_id', StringType(), True),
            StructField('visitor_device_type', StringType(), True),
            StructField('conversation_source', StringType(), True),
        ])

        expected_data = [
            ('30187337-116f7d45-f027-41f4-aaa7-4652122d7c1e', '2019-10-09 20:28:56.558', '2019-10-09 20:29:49.716', ['2b6315d4-1254-5b15-b6f7-1f6570893854',' 31a8dd63-92ae-4922-8d0a-70a063e95142'], 'twilio', 'SMS', None),
            ('30187337-b18b621b-880a-487a-ada2-27e98e7d91b5', '2019-10-09 20:28:56.558', '2019-10-09 20:29:49.716', ['3762e72e-218e-5076-aed0-bb95558686fd'], 'twilio', 'SMS', None),
            ('30187337-0457bec0-2d97-4e50-85d8-490f16c3659d', '2019-10-09 20:28:56.558', '2019-10-09 20:29:49.716', ['21117d4f-3f13-5429-9d98-dcce83a889a7'], 'twilio', 'SMS', None),
            ('30187337-3507506b-78f5-4179-b603-1740c5d24b2e',  '2019-10-09 20:28:56.558', '2019-10-09 20:29:49.716', None, 'twilio', 'SMS', None)
        ]

        expected_df = self.spark.createDataFrame(data=expected_data, schema=expected_schema)

        transformed_df = ConversationGuidRef.transfrom_conversation_web_traffic(msg_cus_df, msg_df)
        
        transformed_df = transformed_df.select(
            transformed_df.parent_contact_id,
            transformed_df.conversation_start_utc_ts,
            transformed_df.conversation_end_utc_ts,
            transformed_df.conversation_end_utc_date,
            transformed_df.visit_guids,
            transformed_df.app_id,
            transformed_df.visitor_device_type,
            transformed_df.conversation_source
        )

        transformed_df.show()
        expected_df.show()

        # Compare schema of transformed_df and expected_df
        field_list = lambda fields: (fields.name, fields.dataType, fields.nullable)
        fields1 = [*map(field_list, transformed_df.schema.fields)]
        fields2 = [*map(field_list, expected_df.schema.fields)]
        res = set(fields1) == set(fields2)
        # print(res)
        # assert
        self.assertTrue(res)
        # Compare data in transformed_df and expected_df
        self.assertEqual(sorted(expected_df.collect()), sorted(transformed_df.collect()))

    def test_second_stuff(self):
        print('second stuffs')
        self.assertTrue(1)


if __name__ == '__main__':
    unittest.main()

E
ERROR: /Users/vshah/Library/Jupyter/runtime/kernel-87dc8301-3d71-49dd-8802-8f26bba25760 (unittest.loader._FailedTest)
----------------------------------------------------------------------
AttributeError: module '__main__' has no attribute '/Users/vshah/Library/Jupyter/runtime/kernel-87dc8301-3d71-49dd-8802-8f26bba25760'

----------------------------------------------------------------------
Ran 1 test in 0.001s

FAILED (errors=1)


SystemExit: True

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


In [2]:
python -m unittest -v

SyntaxError: invalid syntax (118900820.py, line 1)