In [2]:
from datetime import datetime
import json
import boto3 
from botocore.client import Config

localstack_url = 'http://localhost:4566'

data = [
    {"name": "Alice", "age": 25, "email": "alice@gmail.com"},
    {"name": "Bob", "age": 30, "email": "bob@gmail.com"},
    {"name": "Cathy", "age": 22, "email": "cathy@gmail.com"}
]


def store_s3_json(s3_client, items: list, bucket_name: str):
    try:
        current_date = datetime.now().strftime("%Y/%m/%d")
        prefix = f"stripe/{current_date}/type=json"
        file_name = f"{prefix}/users_{datetime.now().strftime('%H%M%S')}.json"
        json_data = json.dumps(items)
        s3_client.put_object(Bucket=bucket_name, Key=file_name, Body=json_data)
    except Exception as e:
        print(f"Error: {e}")


# Kết nối tới MinIO
# s3 = boto3.client(
#     's3',
#     endpoint_url='http://localhost:9001',
#     aws_access_key_id='minio',
#     aws_secret_access_key='minio123',
#     config=Config(signature_version='s3v4')
# )

s3 = boto3.client(
    's3',
    endpoint_url=localstack_url,
    aws_access_key_id='test',  # Use any access key and secret key for LocalStack
    aws_secret_access_key='test',
    config=Config(signature_version='s3v4')
)

# s3.create_bucket(Bucket='mybucket')

# Tên bucket
bucket_name = 'mybucket'

# Tải file lên MinIO
file_name = 'data.json'
store_s3_json(s3, items=data, bucket_name="mybucket")
print(f'File "{file_name}" uploaded successfully to bucket "{bucket_name}".')


# List objects in the bucket
response = s3.list_objects(Bucket='mybucket')
print('Objects in mybucket:')
for obj in response.get('Contents', []):
    print(f'  {obj["Key"]}')

File "data.json" uploaded successfully to bucket "mybucket".
Objects in mybucket:
  stripe/2024/07/15/type=json/users_153357.json
  stripe/2024/07/16/type=json/users_090908.json
  stripe/2024/07/16/type=json/users_090924.json
  stripe/2024/07/16/type=json/users_091804.json
  stripe/2024/07/18/type=json/users_104557.json
  stripe/2024/07/18/type=json/users_104602.json
  stripe/2024/07/18/type=json/users_152006.json


In [3]:
import os
# # Set environment variables
os.environ["SPARK_HOME"] = "/opt/homebrew/Cellar/apache-spark/3.5.1/libexec"
os.environ["AWS_ACCESS_KEY_ID"] = "dummy"
os.environ["AWS_SECRET_ACCESS_KEY"] = "dummy"

# Verify environment variables
print("SPARK_HOME:", os.environ["SPARK_HOME"])
print("AWS_ACCESS_KEY_ID:", os.environ.get("AWS_ACCESS_KEY_ID"))
print("AWS_SECRET_ACCESS_KEY:", os.environ.get("AWS_SECRET_ACCESS_KEY"))


import hashlib

def md5_hash(data: str) -> str:
    salt = "salt"
    data = salt + data
    md5_hasher = hashlib.md5()
    md5_hasher.update(data.encode("utf-8"))
    return md5_hasher.hexdigest()



import findspark
# Initialize findspark with the specified SPARK_HOME
findspark.init(os.environ["SPARK_HOME"])
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType

# Create SparkSession
spark = SparkSession.builder \
    .appName("S3App") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.2,com.amazonaws:aws-java-sdk-bundle:1.11.375,ru.yandex.clickhouse:clickhouse-jdbc:0.2.6") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.access.key", os.environ["AWS_ACCESS_KEY_ID"]) \
    .config("spark.hadoop.fs.s3a.secret.key", os.environ["AWS_SECRET_ACCESS_KEY"]) \
    .config("spark.hadoop.fs.s3a.endpoint", "http://localhost:4566") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .getOrCreate()

pseudonymize_udf = F.udf(md5_hash, StringType())

# Read a JSON file from S3 (LocalStack)
try:
    df = spark.read.json("s3a://mybucket/stripe/2024/07/18/type=json/users_104602.json")
    df = df.withColumn("email", pseudonymize_udf(F.col("email")))
    df.show()
    df.printSchema()
except Exception as e:
    print(f"Error reading data: {e}")


SPARK_HOME: /opt/homebrew/Cellar/apache-spark/3.5.1/libexec
AWS_ACCESS_KEY_ID: dummy
AWS_SECRET_ACCESS_KEY: dummy
:: loading settings :: url = jar:file:/opt/homebrew/Cellar/apache-spark/3.5.1/libexec/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/longle/.ivy2/cache
The jars for the packages stored in: /Users/longle/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
com.amazonaws#aws-java-sdk-bundle added as a dependency
ru.yandex.clickhouse#clickhouse-jdbc added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-2ba778cc-9d23-4182-931d-302c22ed57ec;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.3.2 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.1026 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
	found ru.yandex.clickhouse#clickhouse-jdbc;0.2.6 in central
	found org.apache.httpcomponents#httpclient;4.5.13 in central
	found org.apache.httpcomponents#httpcore;4.4.13 in central
	found commons-logging#commons-logging;1.2 in central
	found commons-codec#commons-codec;1.11 in central
	found org.apache.httpcomponents#httpmime;4.5.13 in central
	found org.lz4#lz4-java;1.7.1 in central
	found com.fasterxml.jack

+---+--------------------+-----+
|age|               email| name|
+---+--------------------+-----+
| 25|c4d195a831d524f61...|Alice|
| 30|0bf7bb73858b43751...|  Bob|
| 22|78f4d29755f35cfde...|Cathy|
+---+--------------------+-----+

root
 |-- age: long (nullable = true)
 |-- email: string (nullable = true)
 |-- name: string (nullable = true)



                                                                                

----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 52670)
Traceback (most recent call last):
  File "/Users/longle/.pyenv/versions/3.10.13/lib/python3.10/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/Users/longle/.pyenv/versions/3.10.13/lib/python3.10/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/Users/longle/.pyenv/versions/3.10.13/lib/python3.10/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/Users/longle/.pyenv/versions/3.10.13/lib/python3.10/socketserver.py", line 747, in __init__
    self.handle()
  File "/opt/homebrew/Cellar/apache-spark/3.5.1/libexec/python/pyspark/accumulators.py", line 295, in handle
    poll(accum_updates)
  File "/opt/homebrew/Cellar/apache-spark/3.5.1/libexec/python/pyspark/accumulators.py", line 267, in poll
 

In [4]:

clickhouse_url = "jdbc:clickhouse://localhost:8123/default"
clickhouse_properties = {
    "driver": "ru.yandex.clickhouse.ClickHouseDriver",
    "user": "clickhouse-user",
    "password": "secret"
}

# Write DataFrame to ClickHouse
try:
    df.write \
      .format("jdbc") \
      .option("url", clickhouse_url) \
      .option("dbtable", "users") \
      .option("user", clickhouse_properties["user"]) \
      .option("password", clickhouse_properties["password"]) \
      .option("driver", clickhouse_properties["driver"]) \
      .mode("append") \
      .save()
    print("Data written to ClickHouse successfully.")
except Exception as e:
    print(f"Error writing data to ClickHouse: {e}")

Data written to ClickHouse successfully.


24/07/18 15:20:38 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported


In [10]:
from pymongo import MongoClient

# Cấu hình thông tin xác thực
username = 'admin' # Người dùng bạn đã tạo trong mongo-init.js hoặc MONGO_INITDB_ROOT_USERNAME
password = 'admin' # Mật khẩu tương ứng
host = 'localhost' # Hoặc tên dịch vụ MongoDB trong Docker Compose nếu Python chạy trong cùng môi trường Docker
port = '27017'
database_name = 'mongo'

# Tạo chuỗi kết nối
connection_string = f'mongodb://{username}:{password}@{host}:{port}/{database_name}'
# connection_string = f'mongodb://{username}:{password}@{host}:{port}/{database_name}?replicaSet=rs0'

# Kết nối tới MongoDB
client = MongoClient(connection_string)
db = client[database_name]


# Access the collection
# collection = db.mongousers
collection = db.mongousers_3

# Sample data to insert
data = [
    {"date": datetime.now(), "_id":4, "email": "user4@example.com", "name": "long le 4 One 1", "age": 25},
    {"date": datetime.now(), "_id":5, "email": "user5@example.com", "name": "long le  5Two 1", "age": 30},
    {"date": datetime.now(), "_id":6,"email": "user6@example.com", "name": "long le 6 Three 1", "age": 22}
]

# Insert data into the collection
result = collection.insert_many(data)

# Print the inserted IDs
print("Inserted IDs:", result.inserted_ids)

documents = collection.find({}) 
for doc in documents:
    print(doc)


Inserted IDs: [4, 5, 6]
{'_id': 1, 'date': datetime.datetime(2024, 7, 18, 15, 22, 59, 64000), 'email': 'user11@example.com', 'name': 'long le One 1', 'age': 25}
{'_id': 2, 'date': datetime.datetime(2024, 7, 18, 15, 22, 59, 64000), 'email': 'user21@example.com', 'name': 'long le Two 1', 'age': 30}
{'_id': 3, 'date': datetime.datetime(2024, 7, 18, 15, 22, 59, 64000), 'email': 'user31@example.com', 'name': 'long le Three 1', 'age': 22}
{'_id': 4, 'date': datetime.datetime(2024, 7, 18, 15, 30, 35, 394000), 'email': 'user4@example.com', 'name': 'long le 4 One 1', 'age': 25}
{'_id': 5, 'date': datetime.datetime(2024, 7, 18, 15, 30, 35, 394000), 'email': 'user5@example.com', 'name': 'long le  5Two 1', 'age': 30}
{'_id': 6, 'date': datetime.datetime(2024, 7, 18, 15, 30, 35, 394000), 'email': 'user6@example.com', 'name': 'long le 6 Three 1', 'age': 22}


24/07/18 17:29:12 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1872750 ms exceeds timeout 120000 ms
24/07/18 17:29:12 WARN SparkContext: Killing executors is not supported by current scheduler.
24/07/18 17:29:13 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:642)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1223)
	at 