In [None]:
# Thư viện
from clickhouse_driver import Client
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np
import pymongo
from bson.son import SON

In [None]:
# Kết nối Spark
spark = SparkSession.builder.master('spark://10.56.10.100:7077').config('spark.cores.max', 8).getOrCreate()

In [None]:
# Bảng muốn sync trên MongoDB
['ChiDinhDichVu'
 ,'ChiDinhDichVu_PhauThuatThuThuat'
 ,'HoaDon'
 ,'HoaDon_ChiTietHoaDon'
 ,'ThongTinChuyenTuyen'
 ,'ThongTinChuyenTuyen_DichVuKemTheo'
 ,'ThongTinDieuTri'
 ,'ThongTinDieuTri_ThongTinDieuTriKhoa'
 ,'ThongTinDieuTri_ThongTinDieuTriKhoa_ThongTinDieuTriPhong']

In [None]:
# Input MongoDB
mgUri = 'mongodb://versatica:***********@*********:27017/?authSource=admin&authMechanism=SCRAM-SHA-256'
host = "*******"
mgDatabase = 'phutho'
rawCollection = 'ChiDinhDichVu'
mgCollection = 'newdata_' + rawCollection
# Input ClickHouse
chUrl="jdbc:clickhouse://*********:8123"
chUser="default"
chPassword="@!"
chTable = "STAGING_" + rawCollection
chDatabase = "new_vietsens"

In [None]:
clickhouse = {
    'url': f'{chUrl}/{chDatabase}',
    'user': chUser,
    'password': chPassword,
    'dbtable' : chTable,
    'isolationLevel' : 'NONE'
}

In [None]:
client = Client(host = host,
               port = "9000",
               user = chUser,
               password = chPassword)

In [None]:
# Kết nối đến Collection
collection_df = spark.read.format("mongodb").option("connection.uri", mgUri).option('database', mgDatabase).option('collection', mgCollection).load()

### Danh sách cột trên Collection

In [None]:
class dataFrameSpark:
    def __init__(self,df):
        self.df = df
        listColumnName = []
        listColumnType = []
        for field in df.schema.fields:
            listColumnName.append(field.name)
            listColumnType.append(field.dataType)
        self.listColumnName = listColumnName
        self.listColumnType = listColumnType

In [None]:
newDataDF = dataFrameSpark(collection_df)

### Danh sách cột trên Clickhouse

In [None]:
# Bảng Mapping kiểu dữ liệu từ NoSQL sang SQL
DIM_type = pd.DataFrame({'mongoType':['StringType()','DoubleType()','LongType()','TimestampType()','IntegerType()']
                           , 'type':['String','Float64','Int64','DateTime','Int64']}).astype(str)

In [None]:
# Tên cột và kiểu dữ liệu trên Mongo
mgTableInfo = pd.DataFrame({'name':newDataDF.listColumnName
                           , 'mongoType':newDataDF.listColumnType}).astype(str)

In [None]:
newDataDF.df.printSchema()

In [None]:
# Tên cột và kiểu dữ liệu trên ClickHouse
chTableInfo = pd.DataFrame(client.execute( f'''DESCRIBE TABLE {chDatabase}.{chTable}'''), columns = ['name','clickhouseType','default_type','default_expression','comment','codec_exression)','ttl_expression']).astype(str)
chTableInfo

In [None]:
# Mapping
mgTableInfo = pd.merge(mgTableInfo,chTableInfo,on='name',how='left')
mgTableInfo

In [None]:
# Bỏ cột rỗng
nullTypeList = mgTableInfo[mgTableInfo['mongoType'] == 'NullType()']
nullTypeList
for index, row in nullTypeList.iterrows():
    print(row['name'])
    newDataDF.df = newDataDF.df.drop(row['name'])

In [None]:
# Bỏ cột là array
arrayTypeList = mgTableInfo[mgTableInfo['mongoType'].str[:5] == 'Array']
arrayTypeList
for index, row in arrayTypeList.iterrows():
    print(row['name'])
    newDataDF.df = newDataDF.df.drop(row['name'])

In [None]:
# Thêm các cột mới vào ClickHouse table
mgTableInfo = mgTableInfo[mgTableInfo.clickhouseType.isnull()]
mgTableInfo = mgTableInfo[mgTableInfo['mongoType'] != 'NullType()']
mgTableInfo = mgTableInfo[mgTableInfo['mongoType'].str[:5] != 'Array']
mgTableInfo
mgTableInfo = pd.merge(mgTableInfo,DIM_type,on='mongoType',how='left')
mgTableInfo['query'] = f'''ALTER TABLE {chDatabase}.{chTable} ADD COLUMN ''' + mgTableInfo['name'] + " Nullable(" + mgTableInfo['type'] + ")"
for index, row in mgTableInfo.iterrows():
    client.execute(row['query'])

In [None]:
newDataDF.df.printSchema()

### CALL Procedure trên Mongo

In [None]:
# Bắt đầu Sync
newDataDF.df.write.format('jdbc').mode('append').options(**clickhouse).save()

In [None]:
build_newdata_hoadon_chitiethoadon = {"Collection" : "HoaDon","pipline" :[
            { "$match": { "NgayDongBo": { "$gte": 20220101000000 },
                    "ChiTietDichVu": { "$exists": "true" },
                    "ChiTietDichVu._id": { "$exists": "true" }
                } },
            # { "$sort": { "NgayDongBo": 1 } },
            # {"$limit": 100000},
            { "$lookup": {
                    "from": "DanhMucBenhVien",
                    "localField": "MaCSKCB",
                    "foreignField": "MaBenhVien",
                    "as": "CSKCB",
                } },
            { "$unwind": "$ChiTietDichVu" },
            { "$project": {
                    "ThongTinHoaDonId": { "$toString": "$_id" },
                    "NgayThu": 1,
                    "MaKyThuatDungChung": "$ChiTietDichVu.MaKyThuatDungChung",
                    "NgayHuy": { "$ifNull": [ "$NgayHuy", None ] } ,
                    "QuanHuyenCSKCB": { "$arrayElemAt": ["$CSKCB.TenHuyen", 0] },
                    "_id": { "$concat": [{ "$toString": "$_id" }, '_', "$ChiTietDichVu._id"] },
                    "NgayDongBo1": {
                        "$cond": [{ "$lt": ["$NgayDongBo", 20200101000000] }, None,
                            { "$dateFromString": { "dateString": { "$toString": "$NgayDongBo" }, "format": "%Y%m%d%H%M%S", "timezone": "Asia/Ho_Chi_Minh" } }]
                    }
                } },
    {"$addFields": {"Sign" : 1}},
    {"$addFields": {"Version" : 1}},
    { "$out": "newdata_HoaDon_ChiTietHoaDon" }
    ]}

In [None]:
client = pymongo.MongoClient(mgUri)

In [None]:
ListBuild = [build_newdata_hoadon_chitiethoadon,]

In [None]:
for input in ListBuild:
    print(input['Collection'])
    Collection = client['phutho'][input['Collection']]
    Collection.aggregate(input['pipline'])
    print("Done")