## !pip install --upgrade pip
## !pip3 install --upgrade pandas

In [1]:
import os
import pyspark
import pandas as pd
import numpy as np
from pyspark.sql import SQLContext, SparkSession
SPARK_WAREHOUSE_DIR = "hdfs://hadoop-service:9000/user/hive/warehouse"
spark = SparkSession \
        .builder \
        .appName("Remote Spark") \
        .master("spark://spark-master:7077") \
        .config("hive.metastore.uris", "thrift://hms-service:9083") \
        .config("spark.sql.warehouse.dir", SPARK_WAREHOUSE_DIR) \
        .config("spark.executor.cores", "1") \
        .config("spark.executor.memory", "1G") \
        .enableHiveSupport() \
        .getOrCreate()
        
sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)
print("Spark Version: " + spark.version)
print("PySpark Version: " + pyspark.__version__)
print("Pandas Version: " + pd.__version__)

Spark Version: 3.3.1
PySpark Version: 3.3.1
Pandas Version: 1.3.5


In [None]:
def progressbar():
    print('Start ', end='', flush=True)
    while not (taskdone):
        sleep(5)
        print('.', end='', flush=True)

def inittask(droptable = False):
    sqlContext.sql("create database if not exists sample;")
    sqlContext.sql("use sample;")
    
    if(droptable):
        sqlContext.sql("drop table if exists sample.tb_test;")
        sqlContext.sql("drop table if exists sample.tb_sev_u;")
        sqlContext.sql("drop table if exists sample.tb_test_qf_stat;")

    sqlContext.sql("""
        CREATE TABLE if not exists sample.tb_test
        (
        f01    BIGINT comment 'unix_timestamp',
        f02    string comment 'f02本端号码（去重后2千万）',
        f03    string,
        f04    string comment 'f04对端号码（去重后2千万）, count distinct clause',
        f05    string,
        f06    string comment 'group by clause, ref-a',
        f07    string comment 'group by clause, ref-b',
        f08    string,
        f09    string,
        f10    INT,
        f11    string,
        f12    string,
        f13    string,
        f14    string comment 'data filtering col, where f14 = 49',
        f15    BIGINT,
        f16    string comment '中文文本内容，需要制表符转变',
        f17    string,
        f18    string,
        f19    INT,
        f20    string,
        f21    string,
        f22    INT comment '1st group by clause, maybe category',
        f23    string,
        f24    string,
        f25    string,
        f26    string,
        f27    string,
        f28    string,
        f29    string,
        f30    bigint comment 'business datetime',
        f31    bigint comment 'processing datetime'
        ) partitioned by (cp bigint, ld bigint) 
        stored as orc tblproperties ("orc.compress"="ZLIB");
    """)

    sqlContext.sql("""
        create table if not exists sample.tb_sev_u (id string, name string);
    """)

    sqlContext.sql("""
        create table if not exists sample.tb_test_qf_stat(
        f22 string,
        f02 string,
        f16 string,
        cnt bigint,
        f06    string,
        f07    string
        )partitioned by (bd string, ad bigint);
    """)
    
    df = sqlContext.sql("""show tables;""")
    df.show()

In [None]:
import threading
from time import sleep

taskdone = False

def gendata(scale = 1, batch_size = 10000):
    import math
    from pyspark.sql.functions  import from_unixtime

    scale_factor = scale*1000
    scale_unit =  math.ceil(10000/300)
    scale_rounds = math.ceil(scale_factor/scale_unit)
    LENGTH = 10

    for i in range(scale_rounds):
        if 'df' in globals():
            del df

        print("running round: " + str(i+1) + " of total " + str(scale_rounds)  +" rounds, with batchsize = " + str(batch_size) + ".")

        df = pd.DataFrame()

        cols=["f01", "f02","f03", "f04", "f05", "f06", "f07", "f08", "f09", "f10","f11","f12","f13","f14","f15","f16","f17","f18","f19","f20","f21","f22","f23","f24","f25","f26","f27","f28","f29","f30","f31" ]
        random_txtcols=["f03","f05","f08","f09","f11","f12","f13","f16","f17","f18","f20","f21","f23","f24","f25","f26","f27","f28","f29"]
        alphabet = list('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789')

        start_ts = pd.Timestamp.now()
        start_ts_int = int(pd.to_datetime(start_ts).value/10**9)
        initial_ts_int = int(pd.to_datetime(start_ts - pd.Timedelta(days=183)).value/10**9)
        df2 = pd.DataFrame(np.random.randint(19200000000,19200000000+20000000,size=(batch_size,2)), columns=list(["f02", "f04"])) 
        df1 = pd.DataFrame(np.random.randint(initial_ts_int, start_ts_int, size=(batch_size,1)),  columns=list(["f01"]))
        df6 = pd.DataFrame(np.random.randint(0, 10, size=(batch_size,5)),  columns=list(["f06", "f07", "f10", "f15", "f19"]))
        df14 = pd.DataFrame(np.random.randint(48, 51, size=(batch_size,1)),  columns=list(["f14"]))
        df22 = pd.DataFrame(np.random.randint(1, 10, size=(batch_size,1)),  columns=list(["f22"]))
        df30 = pd.DataFrame(np.random.randint(start_ts_int-(30*24*3600), start_ts_int, size=(batch_size,2)),  columns=list(["f30","f31"]))

        for k in random_txtcols: 
            np_batchsize = np.random.choice(np.array(alphabet, dtype="|U1"), [batch_size, LENGTH])
            df0 = pd.DataFrame( ["".join(np_batchsize[i]) for i in range(len(np_batchsize))], columns=[k])
            df[k] = df0[k]

        df = pd.concat([df1,df2, df6, df14, df22, df30, df], axis=1, join='inner')

        df = df[cols]


        sparkDF = spark.createDataFrame(df)
        sparkDF = sparkDF.withColumn("cp", from_unixtime(sparkDF["f01"], "yyyyMMddHH"))
        sparkDF = sparkDF.withColumn("ld", from_unixtime(sparkDF["f01"], "yyyyMMddHH"))
        sparkDF.write.mode("append").partitionBy( ["cp","ld"]).bucketBy(32, "f02").sortBy("f01").format("orc").saveAsTable("tb_test")

        finish_ts = pd.Timestamp.now()
        print("finished round: "+ str(i+1) + " for " + str(batch_size) + " rows with " + str( (finish_ts - start_ts).seconds)  + " seconds")

    sqlContext.sql("use sample;")
    sqlContext.sql("show tables").show()
    
        
def longtask():
    inittask(False)
    gendata(1, 10*1000)
    global taskdone
    taskdone = True

# start the thread pool
t1 = threading.Thread(target=progressbar)
t2 = threading.Thread(target=longtask)

# wait for all tasks to complete
# start threads
t1.start()
t2.start()

# wait until threads finish their job
t1.join()
t2.join()

print('\nDone!')


sqlContext.sql("use sample;")
df = sqlContext.sql(" select count(*) from tb_test;")
df.show()

sqlContext.sql("ANALYZE TABLE sample.tb_test COMPUTE STATISTICS FOR ALL COLUMNS ")

df = sqlContext.sql("DESCRIBE EXTENDED sample.tb_test ")
df.show(100,False)

In [2]:
sqlContext.sql("use sample;") 
df = sqlContext.sql(" select count(*) from tb_test;") 
df.show()

+--------+
|count(1)|
+--------+
|   20000|
+--------+

