## Importing Pakages

In [2]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import *
from pyspark.sql.functions import trim
import pandas as pd
import numpy as np
import json
import requests
from datetime import datetime, timedelta
import pyspark

In [3]:
pyspark.__version__

'3.2.3'

## Starting Spark Session

In [4]:
spark = SparkSession.builder.appName('CNEO_data_extractor').getOrCreate()
spark

In [5]:
# date = datetime.today().date()
# date_min = date + timedelta(days=59)
# date_max = date + timedelta(days=60)
# print(date_min, date_max)

## Importing Previous/Old/Available Data

In [6]:
sdfl = spark.read.csv('./raw_input_data.csv', inferSchema=True, header=True)
sdfl.show(2)

+-----------+--------+----------------------+-------------------+-----------------------------+--------------------------------+--------------------------------+-----------------+-----------------+-----------------------------+------------------------+-------------+-------------------+-----------------+
|Designation|Orbit Id|Time of Close approach|Close-Approach Date|Nominal Approch distance (au)|Min Close-Approach Distance (au)|Max Close-Approach Distance (au)|V Reletive (Km/s)|V Infinite (Km/s)|Close-Approach Uncertain Time|Absolute Magnitude (mag)|Diameter (Km)|Diameter-Sigma (Km)|           Object|
+-----------+--------+----------------------+-------------------+-----------------------------+--------------------------------+--------------------------------+-----------------+-----------------+-----------------------------+------------------------+-------------+-------------------+-----------------+
|   2020 BN7|       6|     2415023.594589649|1900-01-04 02:16:00|           0.0896607

#### Some Data cleaning

In [7]:
sdfl = sdfl.fillna(value=-1)
sdfl.show(2)

+-----------+--------+----------------------+-------------------+-----------------------------+--------------------------------+--------------------------------+-----------------+-----------------+-----------------------------+------------------------+-------------+-------------------+-----------------+
|Designation|Orbit Id|Time of Close approach|Close-Approach Date|Nominal Approch distance (au)|Min Close-Approach Distance (au)|Max Close-Approach Distance (au)|V Reletive (Km/s)|V Infinite (Km/s)|Close-Approach Uncertain Time|Absolute Magnitude (mag)|Diameter (Km)|Diameter-Sigma (Km)|           Object|
+-----------+--------+----------------------+-------------------+-----------------------------+--------------------------------+--------------------------------+-----------------+-----------------+-----------------------------+------------------------+-------------+-------------------+-----------------+
|   2020 BN7|       6|     2415023.594589649|1900-01-04 02:16:00|           0.0896607

In [8]:
sdfl = sdfl.withColumn('Object', trim('Object'))

#### Extracting Last Record's Date

In [13]:
lr = sdfl.tail(1)
last_date = lr[0].asDict()['Close-Approach Date']
# last_date = str(last_date.date() + timedelta(days=1))
last_date = last_date.split(' ')[0]
last_date


'2023-01-02'

In [14]:
sdfl.show(2)

+-----------+--------+----------------------+-------------------+-----------------------------+--------------------------------+--------------------------------+-----------------+-----------------+-----------------------------+------------------------+-------------+-------------------+----------+
|Designation|Orbit Id|Time of Close approach|Close-Approach Date|Nominal Approch distance (au)|Min Close-Approach Distance (au)|Max Close-Approach Distance (au)|V Reletive (Km/s)|V Infinite (Km/s)|Close-Approach Uncertain Time|Absolute Magnitude (mag)|Diameter (Km)|Diameter-Sigma (Km)|    Object|
+-----------+--------+----------------------+-------------------+-----------------------------+--------------------------------+--------------------------------+-----------------+-----------------+-----------------------------+------------------------+-------------+-------------------+----------+
|   2020 BN7|       6|     2415023.594589649|1900-01-04 02:16:00|           0.0896607474147164|           

In [9]:
# temp_df = sdfl.toPandas()



## Extracting Today's Data using API Call


In [15]:
url = "https://ssd-api.jpl.nasa.gov/cad.api"
parameters = {
    "date-min": last_date,
    "date-max": str(datetime.today().date()),
    "dist-max": "0.05",
    'fullname': "true",
    'dist-max': "0.1",
    'diameter': "true"
}
response = requests.get(url, parameters)
data = response.json()


In [16]:
data['count']

'13'

In [17]:
columns = [
    'Designation',
    'Orbit Id',
    'Time of Close approach',
    'Close-Approach Date',
    'Nominal Approch distance (au)',
    'Min Close-Approach Distance (au)',
    'Max Close-Approach Distance (au)',
    'V Reletive (Km/s)',
    'V Infinite (Km/s)',
    'Close-Approach Uncertain Time',
    'Absolute Magnitude (mag)',
    'Diameter (Km)',
    'Diameter-Sigma (Km)',
    'Object'
]
types = [
    StringType(),
    StringType(),
    DoubleType(),
    DateType(),
    DoubleType(),
    DoubleType(),
    DoubleType(),
    DoubleType(),
    DoubleType(),
    StringType(),
    DoubleType(),
    DoubleType(),
    DoubleType(),
    StringType(),
         ]
dic = {columns[i]: types[i] for i in range(len(columns))}
dic

{'Designation': StringType,
 'Orbit Id': StringType,
 'Time of Close approach': DoubleType,
 'Close-Approach Date': DateType,
 'Nominal Approch distance (au)': DoubleType,
 'Min Close-Approach Distance (au)': DoubleType,
 'Max Close-Approach Distance (au)': DoubleType,
 'V Reletive (Km/s)': DoubleType,
 'V Infinite (Km/s)': DoubleType,
 'Close-Approach Uncertain Time': StringType,
 'Absolute Magnitude (mag)': DoubleType,
 'Diameter (Km)': DoubleType,
 'Diameter-Sigma (Km)': DoubleType,
 'Object': StringType}

In [31]:
# def createSchema(columns):
#     myScehma = []
#     for key, value in dic.items():
#         t = StructField(key, value, True)
#         myScehma.append(t)
#     # print(myScehma)
#     return myScehma


# scheme = createSchema(columns)
# # print(type(scheme))
# myScehma = StructType([i for i in scheme])
# myScehma


myScehma = StructType([\
    StructField('Designation', StringType(), True),\
    StructField('Orbit Id', StringType(), True),\
    StructField('Time of Close approach', DoubleType(), True),\
    StructField('Close-Approach Date', DateType(), True),\
    StructField('Nominal Approch distance (au)', DoubleType(), True),\
    StructField('Min Close-Approach Distance (au)', DoubleType(), True),\
    StructField('Max Close-Approach Distance (au)', DoubleType(), True),\
    StructField('V Reletive (Km/s)', DoubleType(), True),\
    StructField('V Infinite (Km/s)', DoubleType(), True),\
    StructField('Close-Approach Uncertain Time', StringType(), True),\
    StructField('Absolute Magnitude (mag)', DoubleType(), True),\
    StructField('Diameter (Km)', DoubleType(), True),\
    StructField('Diameter-Sigma (Km)', DoubleType(), True),\
    StructField('Object', StringType(), True)\
])
# myScehma = StructType([\
#     StructField('Designation', StringType(), True),\
#     StructField('Orbit Id', StringType(), True),\
#     StructField('Time of Close approach', StringType(), True),\
#     StructField('Close-Approach Date', StringType(), True),\
#     StructField('Nominal Approch distance (au)', StringType(), True),\
#     StructField('Min Close-Approach Distance (au)', StringType(), True),\
#     StructField('Max Close-Approach Distance (au)', StringType(), True),\
#     StructField('V Reletive (Km/s)', StringType(), True),\
#     StructField('V Infinite (Km/s)', StringType(), True),\
#     StructField('Close-Approach Uncertain Time', StringType(), True),\
#     StructField('Absolute Magnitude (mag)', StringType(), True),\
#     StructField('Diameter (Km)', StringType(), True),\
#     StructField('Diameter-Sigma (Km)', StringType(), True),\
#     StructField('Object', StringType(), True)\
# ])
myScehma

StructType(List(StructField(Designation,StringType,true),StructField(Orbit Id,StringType,true),StructField(Time of Close approach,DoubleType,true),StructField(Close-Approach Date,DateType,true),StructField(Nominal Approch distance (au),DoubleType,true),StructField(Min Close-Approach Distance (au),DoubleType,true),StructField(Max Close-Approach Distance (au),DoubleType,true),StructField(V Reletive (Km/s),DoubleType,true),StructField(V Infinite (Km/s),DoubleType,true),StructField(Close-Approach Uncertain Time,StringType,true),StructField(Absolute Magnitude (mag),DoubleType,true),StructField(Diameter (Km),DoubleType,true),StructField(Diameter-Sigma (Km),DoubleType,true),StructField(Object,StringType,true)))

In [19]:
t = data['data']
modified_raw_data = {}
for i in range(len(columns)):
    for j in range(len(t)):
        if columns[i] in modified_raw_data:
            modified_raw_data[columns[i]].append(t[j][i])
        else:
            modified_raw_data[columns[i]] = [t[j][i]]
modified_raw_data

{'Designation': ['2022 YT3',
  '2022 YU3',
  '2022 YZ3',
  '2022 YY6',
  '2022 YP5',
  '2021 NF',
  '2022 YJ4',
  '2015 AO43',
  '2011 WR41',
  '2019 AY3',
  '2022 YS4',
  '2022 XW1',
  '2018 PN22'],
 'Orbit Id': ['7',
  '5',
  '3',
  '4',
  '6',
  '6',
  '5',
  '9',
  '11',
  '7',
  '5',
  '3',
  '6'],
 'Time of Close approach': ['2459946.529982869',
  '2459946.716108034',
  '2459946.895485196',
  '2459946.962983696',
  '2459947.244799544',
  '2459947.410115729',
  '2459947.741992184',
  '2459947.934757015',
  '2459948.221946792',
  '2459949.029916917',
  '2459949.348464181',
  '2459949.356584550',
  '2459949.488279088'],
 'Close-Approach Date': ['2023-Jan-02 00:43',
  '2023-Jan-02 05:11',
  '2023-Jan-02 09:29',
  '2023-Jan-02 11:07',
  '2023-Jan-02 17:53',
  '2023-Jan-02 21:51',
  '2023-Jan-03 05:48',
  '2023-Jan-03 10:26',
  '2023-Jan-03 17:20',
  '2023-Jan-04 12:43',
  '2023-Jan-04 20:22',
  '2023-Jan-04 20:33',
  '2023-Jan-04 23:43'],
 'Nominal Approch distance (au)': ['0.02846575

In [25]:
df = pd.DataFrame(data['data'], columns=data['fields'])
df.head()

Unnamed: 0,des,orbit_id,jd,cd,dist,dist_min,dist_max,v_rel,v_inf,t_sigma_f,h,diameter,diameter_sigma,fullname
0,2022 YT3,7,2459946.529982869,2023-Jan-02 00:43,0.0284657568010055,0.0283518611759164,0.0285796483222,6.43976919501424,6.42521761094422,< 00:01,25.814,,,(2022 YT3)
1,2022 YU3,5,2459946.716108034,2023-Jan-02 05:11,0.0251910694591085,0.0250168666339067,0.0253652623678832,7.13378641737377,7.11894423823365,< 00:01,25.835,,,(2022 YU3)
2,2022 YZ3,3,2459946.895485196,2023-Jan-02 09:29,0.0605991524261425,0.0601178894510354,0.0610804106086901,11.7046594149789,11.7009022805467,< 00:01,25.088,,,(2022 YZ3)
3,2022 YY6,4,2459946.962983696,2023-Jan-02 11:07,0.0054388636045272,0.0054110703014648,0.0054666562280343,20.2695510903181,20.245367587082,< 00:01,26.109,,,(2022 YY6)
4,2022 YP5,6,2459947.244799544,2023-Jan-02 17:53,0.0199773082677155,0.0199188916322011,0.0200357211576115,4.79371109446374,4.76580690667688,< 00:01,27.085,,,(2022 YP5)


In [26]:
df['jd'] = pd.to_numeric(df['jd'])
df['cd'] = pd.to_datetime(df['cd'])
df['dist'] = pd.to_numeric(df['dist'])
df['dist_min'] = pd.to_numeric(df['dist_min'])
df['dist_max'] = pd.to_numeric(df['dist_max'])
df['v_rel'] = pd.to_numeric(df['v_rel'])
df['v_inf'] = pd.to_numeric(df['v_inf'])
df['t_sigma_f'] = df['t_sigma_f'].astype(str)
df['h'] = pd.to_numeric(df['h'])
df['diameter'] = pd.to_numeric(df['diameter'])
df['diameter_sigma'] = pd.to_numeric(df['diameter_sigma'])
df.tail()

Unnamed: 0,des,orbit_id,jd,cd,dist,dist_min,dist_max,v_rel,v_inf,t_sigma_f,h,diameter,diameter_sigma,fullname
8,2011 WR41,11,2459948.0,2023-01-03 17:20:00,0.040538,0.04053,0.111676,8.948828,8.94148,6_22:43,25.12,,,(2011 WR41)
9,2019 AY3,7,2459949.0,2023-01-04 12:43:00,0.042987,0.018714,0.078098,19.742852,19.739712,1_14:23,23.8,,,(2019 AY3)
10,2022 YS4,5,2459949.0,2023-01-04 20:22:00,0.015371,0.015204,0.015538,6.797318,6.771768,< 00:01,25.583,,,(2022 YS4)
11,2022 XW1,3,2459949.0,2023-01-04 20:33:00,0.091095,0.090545,0.091645,6.042223,6.03738,00:07,25.175,,,(2022 XW1)
12,2018 PN22,6,2459949.0,2023-01-04 23:43:00,0.076661,0.076414,0.076908,3.782524,3.773324,02:23,27.5,,,(2018 PN22)


In [28]:
df.columns = columns
df.head(1)

Unnamed: 0,Designation,Orbit Id,Time of Close approach,Close-Approach Date,Nominal Approch distance (au),Min Close-Approach Distance (au),Max Close-Approach Distance (au),V Reletive (Km/s),V Infinite (Km/s),Close-Approach Uncertain Time,Absolute Magnitude (mag),Diameter (Km),Diameter-Sigma (Km),Object
0,2022 YT3,7,2459947.0,2023-01-02 00:43:00,0.028466,0.028352,0.02858,6.439769,6.425218,< 00:01,25.814,,,(2022 YT3)


In [32]:
nsdf = spark.createDataFrame(df, schema = myScehma)
nsdf


DataFrame[Designation: string, Orbit Id: string, Time of Close approach: double, Close-Approach Date: date, Nominal Approch distance (au): double, Min Close-Approach Distance (au): double, Max Close-Approach Distance (au): double, V Reletive (Km/s): double, V Infinite (Km/s): double, Close-Approach Uncertain Time: string, Absolute Magnitude (mag): double, Diameter (Km): double, Diameter-Sigma (Km): double, Object: string]

In [33]:
nsdf.show(1)

Py4JJavaError: An error occurred while calling o116.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 12.0 failed 1 times, most recent failure: Lost task 0.0 in stage 12.0 (TID 13) (VAibhAv executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:188)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:108)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:121)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:162)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
	at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
	at java.net.PlainSocketImpl.accept(Unknown Source)
	at java.net.ServerSocket.implAccept(Unknown Source)
	at java.net.ServerSocket.accept(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:175)
	... 29 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:492)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:445)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3715)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2728)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2728)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2935)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:287)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:326)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:188)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:108)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:121)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:162)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
	at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
	at java.net.PlainSocketImpl.accept(Unknown Source)
	at java.net.ServerSocket.implAccept(Unknown Source)
	at java.net.ServerSocket.accept(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:175)
	... 29 more


In [24]:
temp = [tuple(d) for d in data['data']]
temp

[('2022 YJ4',
  '5',
  '2459947.741992184',
  '2023-Jan-03 05:48',
  '0.0135605223870528',
  '0.0135184677947036',
  '0.0136025737320697',
  '5.57880952862473',
  '5.54347723391237',
  '< 00:01',
  '26.916',
  None,
  None,
  '       (2022 YJ4)'),
 ('2015 AO43',
  '9',
  '2459947.934757015',
  '2023-Jan-03 10:26',
  '0.0826054551199632',
  '0.0325049787467851',
  '0.135505833277371',
  '8.46115194400779',
  '8.45733889836805',
  '9_07:59',
  '26.5',
  None,
  None,
  '       (2015 AO43)'),
 ('2011 WR41',
  '11',
  '2459948.221946792',
  '2023-Jan-03 17:20',
  '0.0405377113458668',
  '0.0405302257042255',
  '0.111676045470241',
  '8.94882792980961',
  '8.94147999472359',
  '6_22:43',
  '25.12',
  None,
  None,
  '       (2011 WR41)')]

In [26]:
rdd = spark.sparkContext.parallelize(temp)


In [27]:
nsdfrdd = rdd.toDF(columns)
nsdfrdd.printSchema()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 8.0 failed 1 times, most recent failure: Lost task 0.0 in stage 8.0 (TID 9) (VAibhAv executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:189)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:164)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
	at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
	at java.net.PlainSocketImpl.accept(Unknown Source)
	at java.net.ServerSocket.implAccept(Unknown Source)
	at java.net.ServerSocket.accept(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:176)
	... 14 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:189)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:164)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
	at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
	at java.net.PlainSocketImpl.accept(Unknown Source)
	at java.net.ServerSocket.implAccept(Unknown Source)
	at java.net.ServerSocket.accept(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:176)
	... 14 more


In [54]:
# nsdf = spark.createDataFrame(modified_raw_data, schema=myScehma)
# nsdf

In [None]:
df = pd.DataFrame(np.array(data['data']), columns = data['fields'])
df.head()

In [None]:
df['jd'] = pd.to_numeric(df['jd'])
df['cd'] = pd.to_datetime(df['cd'])
df['dist'] = pd.to_numeric(df['dist'])
df['dist_min'] = pd.to_numeric(df['dist_min'])
df['dist_max'] = pd.to_numeric(df['dist_max'])
df['v_rel'] = pd.to_numeric(df['v_rel'])
df['v_inf'] = pd.to_numeric(df['v_inf'])
df['h'] = pd.to_numeric(df['h'])
df['diameter'] = pd.to_numeric(df['diameter'])
df['diameter_sigma'] = pd.to_numeric(df['diameter_sigma'])
df.tail()

In [None]:
df = pd.read_csv('./raw_input_data.csv')
df

In [None]:
df.columns = columns
df

In [None]:
df.to_csv('./raw_input_data.csv', index=False)

In [None]:
# psdf = ps.DataFrame(np.array(data['data']))
# psdf.head()

In [None]:
sdf = spark.read.csv('./raw_input_data.csv', inferSchema=True, header=True)
sdf

In [None]:
sdf.show(2)

In [None]:
sdf = sdf.fillna(value=-1)

In [None]:
sdf.show(2)

In [None]:
# psdf = ps.DataFrame(
#     {'a': [1, 2, 3, 4, 5, 6],
#      'b': [100, 200, 300, 400, 500, 600],
#      'c': ["one", "two", "three", "four", "five", "six"]})

In [None]:
df.columns = columns
df

In [None]:
# with open('data.json', 'w') as datafile:
#     json.dump(data['data'], datafile)

In [None]:
# psdf = ps.from_pandas(df)

In [None]:
# sdf = spark.read.schema(myScehma).json('./data.json')
# sdf.printSchema()

sdf = spark.createDataFrame(df)


In [None]:
sdf.show()

In [None]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
sdf = spark.createDataFrame(df)


In [None]:
sdf.printSchema()


In [None]:

import pandas as pd    
data = [['Scott', 50], ['Jeff', 45], ['Thomas', 54],['Ann',34]] 
  
# Create the pandas DataFrame 
pandasDF = pd.DataFrame(data, columns = ['Name', 'Age']) 
  
# print dataframe. 
print(pandasDF)

# from pyspark.sql import SparkSession

# spark = SparkSession.builder \
#     .master("local[1]") \
#     .appName("SparkByExamples.com") \
#     .getOrCreate()

sparkDF=spark.createDataFrame(pandasDF) 
sparkDF.printSchema()
sparkDF.show()

#sparkDF=spark.createDataFrame(pandasDF.astype(str)) 
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
mySchema = StructType([ StructField("First Name", StringType(), True)\
                       ,StructField("Age", IntegerType(), True)])

sparkDF2 = spark.createDataFrame(pandasDF,schema=mySchema)
sparkDF2.printSchema()
sparkDF2.show()

# Enable Apache Arrow to convert Pandas to PySpark DataFrame
spark.conf.set("spark.sql.execution.arrow.enabled","true")
sparkDF2=spark.createDataFrame(pandasDF) 
sparkDF2.printSchema()
sparkDF2.show()

#Convert PySpark DataFrame to Pandas
pandasDF2=sparkDF2.select("*").toPandas
print(pandasDF2)



In [None]:
sdf.show(5)

In [None]:
lunar_distance_multiplier = 389.174