In [1]:
!pip install faker

Collecting faker
  Downloading Faker-33.1.0-py3-none-any.whl.metadata (15 kB)
Downloading Faker-33.1.0-py3-none-any.whl (1.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m18.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: faker
Successfully installed faker-33.1.0


In [2]:
import random
from datetime import datetime, timedelta
from faker import Faker
import json
import gzip

# Initialize Faker for generating fake text
fake = Faker()

# Define the bounding box (Envelope)
lat_min, lat_max = 20.01, 60.01
lon_min, lon_max = -100.01, -70.01

# Corrected time window
time_start = datetime.strptime('2024-06-24T09:10:00', '%Y-%m-%dT%H:%M:%S')
time_end = datetime.strptime('2024-10-28T09:04:00', '%Y-%m-%dT%H:%M:%S')

# Function to generate random coordinates within the bounding box (Envelope)
def generate_coordinates():
    lat = random.uniform(lat_min, lat_max)
    lon = random.uniform(lon_min, lon_max)
    return {"coordinates": [lon, lat], "type": "Point"}

# Function to generate random timestamps within the time range
def generate_timestamp():
    delta = time_end - time_start
    random_seconds = random.randint(0, int(delta.total_seconds()))
    return time_start + timedelta(seconds=random_seconds)

# Function to generate fake tweet data
def generate_fake_tweet():
    return {
        "created_at": generate_timestamp().strftime('%a %b %d %H:%M:%S +0000 %Y'),
        "id": random.randint(1000000000000000000, 9999999999999999999),
        "id_str": str(random.randint(1000000000000000000, 9999999999999999999)),
        "text": fake.text(max_nb_chars=140),
        "display_text_range": [0, 140],
        "coordinates": generate_coordinates()
    }

# Generate fake tweet data (100 tweets)
tweets = [generate_fake_tweet() for _ in range(1000)]

# Save the tweets to a .txt.gz file
with gzip.open('/content/tweets.txt.gz', 'wt', encoding='utf-8') as f:
    for tweet in tweets:
        f.write(json.dumps(tweet) + "\n")




In [3]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz
!tar xf spark-3.5.3-bin-hadoop3.tgz
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.3-bin-hadoop3"
os.environ["PYTHONPATH"] = "/content/spark-3.5.3-bin-hadoop3/python"
!pip install findspark
import findspark
findspark.init()
!pip install apache-sedona[spark]

from sedona.spark import *
config = SedonaContext.builder(). \
    config('spark.jars.packages',
           'org.apache.sedona:sedona-spark-3.0_2.12:1.6.1,'
           'org.datasyslab:geotools-wrapper:1.6.1-28.2'). \
    config('spark.jars.repositories', 'https://artifacts.unidata.ucar.edu/repository/unidata-all'). \
    getOrCreate()
sedona = SedonaContext.create(config)

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1
Collecting apache-sedona[spark]
  Downloading apache_sedona-1.7.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (2.0 kB)
Downloading apache_sedona-1.7.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (190 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m190.0/190.0 kB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: apache-sedona
Successfully installed apache-sedona-1.7.0


In [4]:
!gunzip -c "/content/tweets.txt.gz" | head -n 5

{"created_at": "Sat Oct 12 00:01:19 +0000 2024", "id": 8718763390561637260, "id_str": "8411290423276739245", "text": "Budget answer officer offer your. In per culture believe.", "display_text_range": [0, 140], "coordinates": {"coordinates": [-98.47952146291117, 46.26051200517337], "type": "Point"}}
{"created_at": "Tue Jul 30 11:36:27 +0000 2024", "id": 3940409734848117741, "id_str": "6057903065815082049", "text": "Sound million base customer up program.\nLittle without hundred crime onto sell structure.", "display_text_range": [0, 140], "coordinates": {"coordinates": [-83.97115765804325, 34.82872707099886], "type": "Point"}}
{"created_at": "Fri Oct 25 12:35:07 +0000 2024", "id": 4173105947408061245, "id_str": "1392672417900636898", "text": "Five thought institution difficult. I staff husband lot production executive within.", "display_text_range": [0, 140], "coordinates": {"coordinates": [-86.3208111646129, 20.448958634975163], "type": "Point"}}
{"created_at": "Wed Sep 11 14:44:40 +000

In [5]:
from sedona.spark import *

In [6]:
test_df = sedona.read\
    .option("delimiter", "\t")\
    .option("header", "true")\
    .option("compression", "gzip")\
    .json("/content/tweets.txt.gz")

In [7]:
test_df.show(5, truncate=False)

+------------------------------------------------+------------------------------+------------------+-------------------+-------------------+----------------------------------------------------------------------------------------------------------------------------+
|coordinates                                     |created_at                    |display_text_range|id                 |id_str             |text                                                                                                                        |
+------------------------------------------------+------------------------------+------------------+-------------------+-------------------+----------------------------------------------------------------------------------------------------------------------------+
|{[-98.47952146291117, 46.26051200517337], Point}|Sat Oct 12 00:01:19 +0000 2024|[0, 140]          |8718763390561637260|8411290423276739245|Budget answer officer offer your. In per culture believe.     

In [8]:
test_df.count()

1000

In [9]:
test_df.printSchema()

root
 |-- coordinates: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- type: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- display_text_range: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- id: decimal(20,0) (nullable = true)
 |-- id_str: string (nullable = true)
 |-- text: string (nullable = true)



In [10]:
from pyspark.sql.functions import col, to_timestamp, lit, concat

In [11]:
test_df.select(
    col("created_at"),
    col("coordinates.coordinates").getItem(0).alias("longitude"),
    col("coordinates.coordinates").getItem(1).alias("latitude")
).where(col("coordinates").isNotNull()).show(5)

+--------------------+------------------+------------------+
|          created_at|         longitude|          latitude|
+--------------------+------------------+------------------+
|Sat Oct 12 00:01:...|-98.47952146291117| 46.26051200517337|
|Tue Jul 30 11:36:...|-83.97115765804325| 34.82872707099886|
|Fri Oct 25 12:35:...| -86.3208111646129|20.448958634975163|
|Wed Sep 11 14:44:...|-97.70903257132711| 39.83278656870034|
|Mon Jul 29 06:14:...|-91.07016218118376| 44.45065873135974|
+--------------------+------------------+------------------+
only showing top 5 rows



In [12]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .getOrCreate()

In [13]:
parsed_df = test_df.withColumn("created_at",to_timestamp(col("created_at"), "EEE MMM dd HH:mm:ss Z yyyy"))
parsed_df.show(5)

+--------------------+-------------------+------------------+-------------------+-------------------+--------------------+
|         coordinates|         created_at|display_text_range|                 id|             id_str|                text|
+--------------------+-------------------+------------------+-------------------+-------------------+--------------------+
|{[-98.47952146291...|2024-10-12 00:01:19|          [0, 140]|8718763390561637260|8411290423276739245|Budget answer off...|
|{[-83.97115765804...|2024-07-30 11:36:27|          [0, 140]|3940409734848117741|6057903065815082049|Sound million bas...|
|{[-86.32081116461...|2024-10-25 12:35:07|          [0, 140]|4173105947408061245|1392672417900636898|Five thought inst...|
|{[-97.70903257132...|2024-09-11 14:44:40|          [0, 140]|3211146502002326465|7573386808769317837|Game behind range...|
|{[-91.07016218118...|2024-07-29 06:14:55|          [0, 140]|5394156259522975603|8278579319866714861|Want doctor until...|
+---------------

In [14]:
result_df = parsed_df.select(

    col("coordinates.coordinates").getItem(0).alias("longitude"),
    col("coordinates.coordinates").getItem(1).alias("latitude"),
     col("created_at")
).where(col("coordinates").isNotNull())
result_df.show(3)


+------------------+------------------+-------------------+
|         longitude|          latitude|         created_at|
+------------------+------------------+-------------------+
|-98.47952146291117| 46.26051200517337|2024-10-12 00:01:19|
|-83.97115765804325| 34.82872707099886|2024-07-30 11:36:27|
| -86.3208111646129|20.448958634975163|2024-10-25 12:35:07|
+------------------+------------------+-------------------+
only showing top 3 rows



In [15]:
test_df.sparkSession.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

In [16]:
import shutil
# Delete the output1 directory if it exists
if os.path.exists("output1"):
    shutil.rmtree("output1")
    print("Directory 'output1' deleted successfully.")
else:
    print("Directory 'output1' does not exist.")



Directory 'output1' does not exist.


In [17]:
from sedona.core.SpatialRDD import PointRDD
from sedona.core.enums import FileDataSplitter
result_df.write.csv("output1", header=False)
data = PointRDD(sedona.sparkContext, 'output1', 0, FileDataSplitter.CSV, True)
all_records = data.rawSpatialRDD.collect()
all_records[0:5]

[Geometry: Point userData: 2024-10-12T00:01:19.000Z,
 Geometry: Point userData: 2024-07-30T11:36:27.000Z,
 Geometry: Point userData: 2024-10-25T12:35:07.000Z,
 Geometry: Point userData: 2024-09-11T14:44:40.000Z,
 Geometry: Point userData: 2024-07-29T06:14:55.000Z]

In [18]:
type(data)

In [19]:
type(all_records)

list

In [20]:
all_records[0].userData

'2024-10-12T00:01:19.000Z'

In [21]:
#https://sedona.apache.org/1.0.0-1.2.0-incubating/tutorial/core-python/
from sedona.core.geom.envelope import Envelope
from sedona.core.spatialOperator import RangeQuery

In [22]:
range_query_window = Envelope(-90.01, -80.01, 30.01, 40.01)
consider_boundary_intersection = False  ## Only return gemeotries fully covered by the window
using_index = False
query_result = RangeQuery.SpatialRangeQuery(data, range_query_window, consider_boundary_intersection, using_index)
query_result.map(lambda x: (x.geom,x.userData)).collect()[0:10]

[(<POINT (-83.971 34.829)>, '2024-07-30T11:36:27.000Z'),
 (<POINT (-86.882 34.918)>, '2024-07-23T10:30:16.000Z'),
 (<POINT (-87.618 31.902)>, '2024-07-28T12:06:15.000Z'),
 (<POINT (-86.182 34.794)>, '2024-08-19T23:16:40.000Z'),
 (<POINT (-81.531 36.759)>, '2024-07-02T11:51:33.000Z'),
 (<POINT (-86.854 30.604)>, '2024-08-28T06:02:57.000Z'),
 (<POINT (-83.781 35.98)>, '2024-10-12T11:37:43.000Z'),
 (<POINT (-86.051 34.564)>, '2024-10-24T19:47:22.000Z'),
 (<POINT (-85.918 33.854)>, '2024-07-10T16:45:33.000Z'),
 (<POINT (-85.648 39.993)>, '2024-10-07T15:59:29.000Z')]

In [23]:
from datetime import datetime
def convert_to_datetime(timestamp_str):
    return datetime.strptime(timestamp_str, '%Y-%m-%dT%H:%M:%S.%fZ')

time_start = datetime.strptime('2024-10-01T09:04:00', '%Y-%m-%dT%H:%M:%S')
time_end = datetime.strptime('2024-10-24T09:10:00', '%Y-%m-%dT%H:%M:%S')
filtered_result = query_result.filter(
    lambda row: time_start <= convert_to_datetime(row.userData) <= time_end
)
filtered_result.map(lambda x: (x.geom,x.userData)).collect()[0:10]

[(<POINT (-83.781 35.98)>, '2024-10-12T11:37:43.000Z'),
 (<POINT (-85.648 39.993)>, '2024-10-07T15:59:29.000Z'),
 (<POINT (-83.092 34.289)>, '2024-10-18T07:40:13.000Z'),
 (<POINT (-87.078 36.502)>, '2024-10-18T15:10:11.000Z'),
 (<POINT (-86.068 33.82)>, '2024-10-20T22:18:14.000Z'),
 (<POINT (-85.623 38.765)>, '2024-10-18T05:11:31.000Z'),
 (<POINT (-87.437 36.482)>, '2024-10-17T11:02:12.000Z'),
 (<POINT (-84.489 37.632)>, '2024-10-23T06:57:30.000Z'),
 (<POINT (-84.786 31.454)>, '2024-10-10T04:35:30.000Z'),
 (<POINT (-85.493 34.998)>, '2024-10-02T13:06:49.000Z')]

In [24]:
import geopandas as gpd
gpd.GeoDataFrame(
    filtered_result.map(lambda x: [x.geom, x.userData]).collect(),
    columns=["geom", "user_data"],
    geometry="geom"
)


Unnamed: 0,geom,user_data
0,POINT (-83.78059 35.98005),2024-10-12T11:37:43.000Z
1,POINT (-85.64765 39.9926),2024-10-07T15:59:29.000Z
2,POINT (-83.09233 34.28872),2024-10-18T07:40:13.000Z
3,POINT (-87.0781 36.50153),2024-10-18T15:10:11.000Z
4,POINT (-86.0684 33.82005),2024-10-20T22:18:14.000Z
5,POINT (-85.62332 38.76512),2024-10-18T05:11:31.000Z
6,POINT (-87.4373 36.48194),2024-10-17T11:02:12.000Z
7,POINT (-84.48886 37.63183),2024-10-23T06:57:30.000Z
8,POINT (-84.78625 31.45374),2024-10-10T04:35:30.000Z
9,POINT (-85.49307 34.99823),2024-10-02T13:06:49.000Z
