### <center>Abstractive Summarization of Earnings Call Transcripts </center>

In [1]:
# Install sparknlp
!pip install --upgrade sparknlp



In [2]:
import os
import sparknlp

from pyspark.sql.types import *
from pyspark.ml import Pipeline

os.getcwd()

'/home/jovyan/work'

> To get started you will need to include the JDBC driver for you particular database on the spark classpath. For example, to connect to postgres from the Spark Shell you would run the following command:


In [5]:
from pyspark.sql import SparkSession

# Initiating a Spark NLP Session 
spark = SparkSession.builder \
    .appName("Spark NLP")\
    .master("local[4]")\
    .config("spark.python.worker.memory","6G") \
    .config("spark.driver.memory","24G")\
    .config("spark.driver.maxResultSize", "0") \
    .config("spark.kryoserializer.buffer.max", "2000M")\
    .config("spark.serializer","org.apache.spark.serializer.KryoSerializer") \
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:3.1.1")\
    .config("spark.driver.extraClassPath", "./mysql-connector-java-8.0.25.jar")\
    .config("spark.worker.cleanup.enabled",True)\
    .getOrCreate()

# // Loading data from a JDBC source
jdbcDF = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:mysql://sem_mysql:3306/test_db") \
    .option("dbtable", "earningscall") \
    .option("user", "es_user") \
    .option("password", "watchword") \
    .load()

print(jdbcDF.columns)

['companyid', 'ticker', 'company_name', 'filepath', 'transcript']


In [6]:
jdbcDF.show(5)

+---------+------+--------------------+--------+----------+
|companyid|ticker|        company_name|filepath|transcript|
+---------+------+--------------------+--------+----------+
|        1|   AAP|Advance Auto Part...| AAP.txt|          |
|        2|    AI|               C3.ai|  AI.txt|          |
|        3|  AMAT|Applied Materials...|AMAT.txt|          |
|        4|  AMSC|American Supercon...|AMSC.txt|          |
|        5|  ANET|Arista Networks, ...|ANET.txt|          |
+---------+------+--------------------+--------+----------+
only showing top 5 rows



In [7]:
from pyspark.sql.functions import concat,lit

folder_path = os.getcwd()
folder_name =  "earnings_call_transcripts"
file_path = os.path.join(folder_path, folder_name, '')

jdbcDF = jdbcDF.select(concat(lit('file:'),lit(file_path),jdbcDF.filepath) \
               .alias("local_filepath"),'companyid', 'ticker', 'company_name', 'filepath', 'transcript')

# jdbcDF.select(concat_ws('/',lit(local_path),lit(folder_name),jdbcDF.filepath)
#               .alias("local_filepath"),'companyid', 'ticker', 'company_name', 'filepath', 'transcript').select('local_filepath').toPandas()['local_filepath'][0]

jdbcDF.printSchema()

root
 |-- local_filepath: string (nullable = true)
 |-- companyid: integer (nullable = true)
 |-- ticker: string (nullable = true)
 |-- company_name: string (nullable = true)
 |-- filepath: string (nullable = true)
 |-- transcript: string (nullable = true)



In [8]:
from pyspark.sql.types import *
from pyspark.sql.functions import *    
# from pyspark.sql.functions import monotonically_increasing_id 

file_path = "./earnings_call_transcripts/"
texts = spark.sparkContext.wholeTextFiles(file_path,use_unicode=True)

schema = StructType([
    StructField('path', StringType()),
    StructField('raw_text', StringType())
#     StructField('text_len',IntegerType())
])

# Add index and Handle escape sequence of \x00 before tokenizing the document
textdf = spark.createDataFrame(texts, schema=schema).persist() \
              .withColumn("index", monotonically_increasing_id()) \
              .withColumn('text', regexp_replace('raw_text', '\x00|\n', ''))

# Add text length to the table
import pyspark.sql.functions as f

textdf = textdf.withColumn("original_text_length",f.length(col("text")))

textdf.printSchema()

textdf.show(5)

root
 |-- path: string (nullable = true)
 |-- raw_text: string (nullable = true)
 |-- index: long (nullable = false)
 |-- text: string (nullable = true)
 |-- original_text_length: integer (nullable = true)

+--------------------+--------------------+-----+--------------------+--------------------+
|                path|            raw_text|index|                text|original_text_length|
+--------------------+--------------------+-----+--------------------+--------------------+
|file:/home/jovyan...|��A d v a n c e  ...|    0|��Advance Auto Pa...|               63600|
|file:/home/jovyan...|��C 3 . a i   ( N...|    1|��C3.ai (NYSE:AI)...|               61685|
|file:/home/jovyan...|��A p p l i e d  ...|    2|��Applied Materia...|               27348|
|file:/home/jovyan...|��A m e r i c a n...|    3|��American Superc...|               39742|
|file:/home/jovyan...|��A r i s t a   N...|    4|��Arista Networks...|               32282|
+--------------------+--------------------+-----+--------

In [9]:
# Join tables from mysql with pyspark table
jdbcJoinedTable = jdbcDF.join(textdf, jdbcDF.local_filepath == textdf.path, 'left').select(jdbcDF.local_filepath,jdbcDF.companyid,jdbcDF.ticker,jdbcDF.company_name,jdbcDF.filepath, textdf.text,textdf.original_text_length) 
jdbcJoinedTable.show()

+--------------------+---------+------+--------------------+--------+--------------------+--------------------+
|      local_filepath|companyid|ticker|        company_name|filepath|                text|original_text_length|
+--------------------+---------+------+--------------------+--------+--------------------+--------------------+
|file:/home/jovyan...|        1|   AAP|Advance Auto Part...| AAP.txt|��Advance Auto Pa...|               63600|
|file:/home/jovyan...|        2|    AI|               C3.ai|  AI.txt|��C3.ai (NYSE:AI)...|               61685|
|file:/home/jovyan...|        3|  AMAT|Applied Materials...|AMAT.txt|��Applied Materia...|               27348|
|file:/home/jovyan...|        4|  AMSC|American Supercon...|AMSC.txt|��American Superc...|               39742|
|file:/home/jovyan...|        5|  ANET|Arista Networks, ...|ANET.txt|��Arista Networks...|               32282|
|file:/home/jovyan...|        6|  AVGO|       Broadcom Ltd |AVGO.txt|��Broadcom Ltd (N...|              

In [10]:
import gc
gc.collect()
spark.catalog.clearCache()

In [11]:

from sparknlp.annotator import *
from sparknlp.common import *
from sparknlp.base import *
from pyspark.ml import Pipeline

assembler = DocumentAssembler()\
    .setInputCol('text')\
    .setOutputCol('doc')
t5 = T5Transformer() \
    .pretrained("t5_small") \
    .setTask("summarize:")\
    .setMaxOutputLength(200)\
    .setInputCols(["doc"]) \
    .setOutputCol("summaries")

pipeline = Pipeline().setStages([assembler, t5])
results = pipeline.fit(jdbcJoinedTable).transform(jdbcJoinedTable)
results.printSchema()

t5_small download started this may take some time.
Approximate size to download 139 MB
[OK!]
root
 |-- local_filepath: string (nullable = true)
 |-- companyid: integer (nullable = true)
 |-- ticker: string (nullable = true)
 |-- company_name: string (nullable = true)
 |-- filepath: string (nullable = true)
 |-- text: string (nullable = true)
 |-- original_text_length: integer (nullable = true)
 |-- doc: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: integer (nullable = false)
 |    |    |-- end: integer (nullable = false)
 |    |    |-- result: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |    |    |-- embeddings: array (nullable = true)
 |    |    |    |-- element: float (containsNull = false)
 |-- summaries: array (nullable = true)
 |    |-- element: struct (contai

In [12]:
# Summary of Earnings call Transcripts for Broadcom Q
results.filter(col("ticker") == "AMAT").select(['summaries.result']).toPandas()['result'][0][0]

'Applied Materials, Inc. is a company that has a diversified portfolio of products . a lot of people are embracing the technology, and the technology is a key driver . a lot of people are embracing the technology, and the technology is a key driver .'