# try to use spark and upload to elasticsearch

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name, udf
from pyspark.sql.types import StringType
import os

# Assuming 'UnstructuredHTMLLoader' and 'Html2TextTransformer' are available for use
from langchain_community.document_loaders import UnstructuredHTMLLoader
from langchain_community.document_transformers import Html2TextTransformer

class CompanyFilingsIndexer:
    def __init__(self):
        self.spark = SparkSession.builder \
            .appName("Elasticsearch Integration") \
            .config("spark.master", "local[*]") \
            .config("spark.jars", "elasticsearch-hadoop-8.12.1.jar") \
            .getOrCreate()
        
    def html_to_text(self, html_content):
        # Convert HTML content to plain text
        loader = UnstructuredHTMLLoader(html_content)
        doc = loader.load()
        html2text = Html2TextTransformer()
        docs_transformed = html2text.transform_documents(doc)
        return docs_transformed[0].page_content

    def process_and_index_documents(self, input_directory, company_name):
        """
        Reads HTML documents from a specified directory, converts them to plain text,
        and indexes them into Elasticsearch.
        """
        # Read all text files in the specified directory as a DataFrame
        df = self.spark.read.text(input_directory).withColumn("fileName", input_file_name())

        # Register the UDF for HTML to text conversion
        html_to_text_udf = udf(self.html_to_text, StringType())

        # Apply the HTML to text conversion UDF
        text_df = df.withColumn("text", html_to_text_udf(df["value"]))

        # Define the Elasticsearch index based on the company name
        es_index = f"filings_{company_name.lower()}"

        # Write the DataFrame to Elasticsearch
        text_df.write.format("org.elasticsearch.spark.sql")\
            .option("es.resource", f"{es_index}/_doc")\
            .option("es.nodes", "localhost")\
            .option("es.port", "9200")\
            .mode("append")\
            .save()

In [2]:
# Example usage
indexer = CompanyFilingsIndexer()

24/02/21 17:05:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [5]:
import bs4
soup = bs4.BeautifulSoup('edgar_docs/AMZN/10-K/2021-02-02.htm', 'html.parser')
soup.get_text()

  soup = bs4.BeautifulSoup('edgar_docs/AMZN/10-K/2021-02-02.htm', 'html.parser')


'edgar_docs/AMZN/10-K/2021-02-02.htm'

In [3]:
input_directory = "edgar_docs/AMZN/10-K/2021-02-02.htm"
indexer.process_and_index_documents(input_directory, "AMZN")

Traceback (most recent call last):
  File "/Users/rushilsheth/Documents/portfolio/busco-fin/venv/lib/python3.11/site-packages/pyspark/serializers.py", line 459, in dumps
    return cloudpickle.dumps(obj, pickle_protocol)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/rushilsheth/Documents/portfolio/busco-fin/venv/lib/python3.11/site-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/Users/rushilsheth/Documents/portfolio/busco-fin/venv/lib/python3.11/site-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 632, in dump
    return Pickler.dump(self, obj)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/rushilsheth/Documents/portfolio/busco-fin/venv/lib/python3.11/site-packages/pyspark/context.py", line 466, in __getnewargs__
    raise PySparkRuntimeError(
pyspark.errors.exceptions.base.PySparkRuntimeError: [CONTEXT_ONLY_VALID_ON_DRIVER] It appears that you are attempting to reference SparkContext from a broadcast variabl

PicklingError: Could not serialize object: PySparkRuntimeError: [CONTEXT_ONLY_VALID_ON_DRIVER] It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

24/02/21 17:05:59 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [3]:
import pandas as pd
file_path = "edgar_docs/AMZN/10-K/2021-02-02.htm"
doc_df = pd.read_csv('edgar_docs/document_info.csv')
company_df = pd.read_csv('edgar_docs/companies_df.csv')

Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd


In [4]:
doc_row = doc_df.loc[doc_df['localFilePath'] == file_path]
metadata = doc_row.to_dict(orient='records')[0]
metadata.update(company_df.loc[company_df['ticker'] == doc_row['ticker'].values[0]].to_dict(orient='records')[0])
metadata

{'ticker': 'AMZN',
 'formType': '10-K',
 'description': 'Form 10-K - Annual report [Section 13 and 15(d), not S-K Item 405]',
 'documentUrl': 'https://www.sec.gov/Archives/edgar/data/1018724/000101872421000004/amzn-20201231.htm',
 'filedAt': '2021-02-02T19:44:10-05:00',
 'periodOfReport': '2020-12-31',
 'localFilePath': 'edgar_docs/AMZN/10-K/2021-02-02.htm',
 'cik': 1018724,
 'companyName': 'AMAZON COM INC',
 'stateOfIncorporation': 'DE',
 'fiscalYearEnd': 1231,
 'sic': '5961 Retail-Catalog &amp; Mail-Order Houses'}

In [5]:
es_loader = DataLoader()
#es_loader.start_elasticsearch()

24/02/21 16:42:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [6]:
file_path = "edgar_docs/AMZN/10-K/2021-02-02.htm"
# metadata_df = es_loader.process_and_load(file_path, metadata)


In [7]:
# es_loader.stop_elasticsearch()

In [8]:
# es_loader.spark.sparkContext.getConf().getAll()

In [9]:
html_content = es_loader.read_html_file(file_path)

In [14]:
html_content

'Table of Contents UNITED STATES SECURITIES AND EXCHANGE COMMISSION Washington,\nD.C. 20549 ____________________________________ FORM 10-K\n____________________________________ (Mark One) ☒ ANNUAL REPORT PURSUANT TO\nSECTION 13 OR 15(d) OF THE SECURITIES EXCHANGE ACT OF 1934 For the fiscal year\nended December 31, 2020 or ☐ TRANSITION REPORT PURSUANT TO SECTION 13 OR 15(d)\nOF THE SECURITIES EXCHANGE ACT OF 1934 For the transition period from to .\nCommission File No. 000-22513 ____________________________________ AMAZON.COM,\nINC. (Exact name of registrant as specified in its charter) Delaware\n91-1646860 (State or other jurisdiction of incorporation or organization)\n(I.R.S. Employer Identification No.) 410 Terry Avenue North Seattle,\nWashington 98109-5210 (206) 266-1000 (Address and telephone number, including\narea code, of registrant’s principal executive offices) Securities registered\npursuant to Section 12(b) of the Act: Title of Each Class Trading Symbol(s)\nName of Each Exch

24/02/21 16:42:21 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


# trying new thing!

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, StructType, StructField
from bs4 import BeautifulSoup

class HtmlFileProcessor:
    def __init__(self):
        self.spark = SparkSession.builder \
            .appName("HTML to Text Processing") \
            .getOrCreate()

    @staticmethod
    @udf(returnType=StringType())
    def html_to_text(html_content):
        """
        Converts HTML content to plain text.
        """
        if html_content is not None:
            soup = BeautifulSoup(html_content, "html.parser")
            return soup.get_text()
        return None

    def process_html_files(self, file_paths):
        """
        Reads HTML content from files, converts to text, and returns a DataFrame of text content.
        """
        # Create an RDD from the list of file paths
        file_paths_rdd = self.spark.sparkContext.parallelize(file_paths)

        # Read the content of each file
        contents_rdd = file_paths_rdd.map(lambda path: (path, self.read_file_content(path)))

        # Convert the RDD to a DataFrame
        schema = StructType([
            StructField("path", StringType(), True),
            StructField("html_content", StringType(), True)
        ])
        html_df = self.spark.createDataFrame(contents_rdd, schema)

        # Convert HTML to text
        text_df = html_df.withColumn("text_content", self.html_to_text(html_df["html_content"]))

        return text_df

    @staticmethod
    def read_file_content(path):
        """
        Reads the content of a file given its path.
        """
        try:
            with open(path, 'r') as file:
                return file.read()
        except Exception as e:
            print(f"Error reading {path}: {e}")
            return None


In [2]:
# Assuming the HtmlFileProcessor class definition is already available as defined previously

# Initialize the processor
processor = HtmlFileProcessor()

# Define the list of HTML file paths to process
file_paths = ["edgar_docs/AMZN/10-K/2021-02-02.htm"]

24/02/21 17:12:40 WARN Utils: Your hostname, Rushils-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.4.84 instead (on interface en0)
24/02/21 17:12:40 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/21 17:12:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [1]:
from langchain_community.document_loaders import UnstructuredHTMLLoader
from langchain_community.document_transformers import Html2TextTransformer
# lets load the data individually by file
file_path = "edgar_docs/AMZN/10-K/2021-02-02.htm"
def html_to_text(file_path):
        loader = UnstructuredHTMLLoader(file_path)
        doc = loader.load()
        html2text = Html2TextTransformer()
        docs_transformed = html2text.transform_documents(doc)
        return docs_transformed[0].page_content
html_txt = html_to_text(file_path)

In [4]:
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.sql.types import StringType, StructField, StructType

spark = SparkSession.builder \
    .appName("Document Processing") \
    .config("spark.master", "local[*]") \
    .config("spark.jars", "elasticsearch-hadoop-8.12.1.jar") \
    .config("spark.es.nodes", "localhost") \
    .config("spark.jars", "elasticsearch-spark-30_2.12-8.12.1.jar") \
    .getOrCreate()

time.sleep(10)  # Wait for the Elasticsearch container to start


24/02/21 18:20:27 WARN Utils: Your hostname, Rushils-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.4.84 instead (on interface en0)
24/02/21 18:20:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/02/21 18:20:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [5]:
print(spark.sparkContext._jvm.scala.util.Properties.versionString())

version 2.12.18


24/02/21 18:20:40 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [6]:
# Convert the HTML text to a DataFrame
schema = StructType([StructField("text", StringType(), True)])
data = [(html_txt,)]  # Create a list of tuples
rdd = spark.sparkContext.parallelize(data)  # Convert the list to an RDD
text_df = spark.createDataFrame(rdd, schema)

company_name = "AMZN"
# Define the Elasticsearch index based on the company name
es_index = f"filings_{company_name.lower()}"

# Write the DataFrame to Elasticsearch
text_df.write.format("org.elasticsearch.spark.sql")\
    .option("es.resource", es_index)\
    .option("es.nodes", "localhost")\
    .option("es.port", "9200")\
    .mode("append")\
    .save()

24/02/21 18:20:47 ERROR Executor: Exception in task 4.0 in stage 0.0 (TID 4)/ 8]
org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Cannot determine write shards for [filings_amzn]; likely its format is incorrect (maybe it contains illegal characters? or all shards failed?)
	at org.elasticsearch.hadoop.util.Assert.isTrue(Assert.java:60)
	at org.elasticsearch.hadoop.rest.RestService.initSingleIndex(RestService.java:689)
	at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:634)
	at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:71)
	at org.elasticsearch.spark.sql.EsSparkSQL$.$anonfun$saveToEs$1(EsSparkSQL.scala:103)
	at org.elasticsearch.spark.sql.EsSparkSQL$.$anonfun$saveToEs$1$adapted(EsSparkSQL.scala:103)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Ex

Py4JJavaError: An error occurred while calling o50.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 0.0 failed 1 times, most recent failure: Lost task 4.0 in stage 0.0 (TID 4) (192.168.4.84 executor driver): org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Cannot determine write shards for [filings_amzn]; likely its format is incorrect (maybe it contains illegal characters? or all shards failed?)
	at org.elasticsearch.hadoop.util.Assert.isTrue(Assert.java:60)
	at org.elasticsearch.hadoop.rest.RestService.initSingleIndex(RestService.java:689)
	at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:634)
	at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:71)
	at org.elasticsearch.spark.sql.EsSparkSQL$.$anonfun$saveToEs$1(EsSparkSQL.scala:103)
	at org.elasticsearch.spark.sql.EsSparkSQL$.$anonfun$saveToEs$1$adapted(EsSparkSQL.scala:103)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2451)
	at org.elasticsearch.spark.sql.EsSparkSQL$.saveToEs(EsSparkSQL.scala:103)
	at org.elasticsearch.spark.sql.ElasticsearchRelation.insert(DefaultSource.scala:629)
	at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:107)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:248)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Cannot determine write shards for [filings_amzn]; likely its format is incorrect (maybe it contains illegal characters? or all shards failed?)
	at org.elasticsearch.hadoop.util.Assert.isTrue(Assert.java:60)
	at org.elasticsearch.hadoop.rest.RestService.initSingleIndex(RestService.java:689)
	at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:634)
	at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:71)
	at org.elasticsearch.spark.sql.EsSparkSQL$.$anonfun$saveToEs$1(EsSparkSQL.scala:103)
	at org.elasticsearch.spark.sql.EsSparkSQL$.$anonfun$saveToEs$1$adapted(EsSparkSQL.scala:103)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	... 1 more


24/02/21 18:20:48 WARN TaskSetManager: Lost task 5.0 in stage 0.0 (TID 5) (192.168.4.84 executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 4 in stage 0.0 failed 1 times, most recent failure: Lost task 4.0 in stage 0.0 (TID 4) (192.168.4.84 executor driver): org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Cannot determine write shards for [filings_amzn]; likely its format is incorrect (maybe it contains illegal characters? or all shards failed?)
	at org.elasticsearch.hadoop.util.Assert.isTrue(Assert.java:60)
	at org.elasticsearch.hadoop.rest.RestService.initSingleIndex(RestService.java:689)
	at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:634)
	at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:71)
	at org.elasticsearch.spark.sql.EsSparkSQL$.$anonfun$saveToEs$1(EsSparkSQL.scala:103)
	at org.elasticsearch.spark.sql.EsSparkSQL$.$anonfun$saveToEs$1$adapted(EsSparkSQL.scala:103)
	at org.apache.spark.sch

In [2]:
import subprocess
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, explode, lit
from pyspark.sql.types import ArrayType, StructType, StructField, StringType
from bs4 import BeautifulSoup
from elasticsearch import Elasticsearch

import subprocess
from subprocess import Popen
import time
import signal
import os

class ElasticsearchLoader():
    def __init__(self, es_nodes='localhost', es_port='9200'):
        super().__init__()
        self.es_nodes = es_nodes
        self.es_port = es_port
        self.es_home = "/Users/rushilsheth/Documents/elasticsearch"  # Path to your Elasticsearch directory
        self.es_bin = self.es_home + "/bin/elasticsearch"  # Path to the Elasticsearch executable

    def start_elasticsearch(self):
        """
        Starts Elasticsearch using subprocess.
        """
        try:
            print("Starting Elasticsearch...")
            self.es_process = Popen([self.es_bin], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
            print("Elasticsearch is starting. Please wait a few moments before it becomes available.")
            time.sleep(30)  # Wait for Elasticsearch to start
        except Exception as e:
            print(f"Failed to start Elasticsearch: {e}")

In [3]:
loader=ElasticsearchLoader()
loader.start_elasticsearch()

Starting Elasticsearch...
Elasticsearch is starting. Please wait a few moments before it becomes available.


In [7]:
#loader.es_process.terminate()  # Terminate the Elasticsearch process