-
Notifications
You must be signed in to change notification settings - Fork 0
RING-44425 - Comments for SPARK scripts #48
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
14fb933
5d3ce2d
bc89b56
41f1b34
4a24dc9
c159dd3
663a845
73f7442
1bfe636
50c5108
644b54e
2c8f3de
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -31,6 +31,9 @@ | |||||||
COS = cfg["cos_protection"] | ||||||||
PARTITIONS = int(cfg["spark.executor.instances"]) * int(cfg["spark.executor.cores"]) | ||||||||
|
||||||||
# The arcindex maps the ARC Schema to the hex value found in the ringkey, in the 24 bits preceding the last 8 bits of the key | ||||||||
# e.g. FD770A344D6A6D259F92C500000000512040C070 | ||||||||
# FD770A344D6A6D259F92C50000000051XXXXXX70 where XXXXXX : 2040C0 | ||||||||
arcindex = {"4+2": "102060", "8+4": "2040C0", "9+3": "2430C0", "7+5": "1C50C0", "5+7": "1470C0"} | ||||||||
|
||||||||
os.environ["PYSPARK_SUBMIT_ARGS"] = '--packages "org.apache.hadoop:hadoop-aws:2.7.3" pyspark-shell' | ||||||||
|
@@ -118,6 +121,7 @@ def sparse(f): | |||||||
|
||||||||
|
||||||||
def check_split(key): | ||||||||
"""With srebuildd, check if the RING key is split or not. Return True if split, False if not split, None if error (422, 404, 50X, etc.)""" | ||||||||
url = "http://%s:81/%s/%s" % (SREBUILDD_IP, SREBUILDD_ARC_PATH, str(key.zfill(40))) | ||||||||
r = requests.head(url) | ||||||||
if r.status_code == 200: | ||||||||
|
@@ -126,9 +130,16 @@ def check_split(key): | |||||||
|
||||||||
|
||||||||
def blob(row): | ||||||||
"""Return a list of dict with the sproxyd input key, its subkey if it exists and digkey""" | ||||||||
# set key from row._c2 (column 3) which contains an sproxyd input key | ||||||||
# input structure: (bucket name, s3 object key, sproxyd input key) | ||||||||
# FIXME: the naming of the method is terrible | ||||||||
key = row._c2 | ||||||||
# use the sproxyd input key to find out if the key is split or not | ||||||||
# check_split(key) is used to transform the input key into a RING key, assess if it exists AND whether it is a SPLIT. | ||||||||
split = check_split(key) | ||||||||
TrevorBenson marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
if not split['result']: | ||||||||
# If the key is not found, return a dict with the key, subkey and digkey set to NOK_HTTP | ||||||||
return [{"key":key, "subkey":"NOK_HTTP", "digkey":"NOK_HTTP"}] | ||||||||
if split['is_split']: | ||||||||
try: | ||||||||
|
@@ -146,25 +157,61 @@ def blob(row): | |||||||
chunks = chunk + chunk | ||||||||
chunkshex = chunks.encode("hex") | ||||||||
rtlst = [] | ||||||||
# the k value is the subkey, a subkey is the sproxyd input key for each stripe of the split | ||||||||
for k in list(set(sparse(chunkshex))): | ||||||||
# "key": key == primary sproxyd input key of a split object | ||||||||
# "subkey": k == subkey sproxyd input key of an individual stripe of a split object | ||||||||
# "digkey": gen_md5_from_id(k)[:26] == md5 of the subkey | ||||||||
# digkey: the unique part of a main chunk before service id, | ||||||||
# arc schema, and class are appended | ||||||||
rtlst.append( | ||||||||
{"key": key, "subkey": k, "digkey": gen_md5_from_id(k)[:26]} | ||||||||
) | ||||||||
# If the key is split and request is OK: | ||||||||
# return a list of dicts with the key (primary sproxyd input key), | ||||||||
# subkey (sproxyd input key of a split stripe) and | ||||||||
# digkey, (md5 of the subkey) | ||||||||
# digkey: the unqiue part of a main chunk before service id, | ||||||||
# arc schema, and class are appended | ||||||||
return rtlst | ||||||||
# If the key is split and request is not OK: | ||||||||
# return a dict with the key (primary sproxyd input key) | ||||||||
# with both subkey and digkey columns set to NOK | ||||||||
return [{"key": key, "subkey": "NOK", "digkey": "NOK"}] | ||||||||
except requests.exceptions.ConnectionError as e: | ||||||||
# If there is a Connection Error in the HTTP request: | ||||||||
# return a dict with the key(primary sproxyd input key), | ||||||||
# with both subkey and digkey set to NOK | ||||||||
return [{"key": key, "subkey": "NOK_HTTP", "digkey": "NOK_HTTP"}] | ||||||||
if not split['is_split']: | ||||||||
# If the key is not split: | ||||||||
# return a dict with the key (primary sproxyd input key), | ||||||||
# subkey set to SINGLE and | ||||||||
# digkey, (md5 of the subkey) | ||||||||
# digkey: the unique part of a main chunk before service id, | ||||||||
# arc schema, and class are appended | ||||||||
return [{"key": key, "subkey": "SINGLE", "digkey": gen_md5_from_id(key)[:26]}] | ||||||||
|
||||||||
new_path = os.path.join(PATH, RING, "s3-bucketd") | ||||||||
files = "%s://%s" % (PROTOCOL, new_path) | ||||||||
|
||||||||
# reading without a header, | ||||||||
# columns _c0, _c1, _c2 are the default column names of | ||||||||
# columns 1, 2, 3 for the csv | ||||||||
# input structure: (bucket name, s3 object key, sproxyd input key) | ||||||||
# e.g. test,48K_object.01,9BC9C6080ED24A42C2F1A9C78F6BCD5967F70220 | ||||||||
# Required Fields: | ||||||||
# - _c2 (sproxyd input key) | ||||||||
df = spark.read.format("csv").option("header", "false").option("inferSchema", "true").option("delimiter", ",").load(files) | ||||||||
|
||||||||
# repartition the dataframe to have the same number of partitions as the number of executors * cores | ||||||||
df = df.repartition(PARTITIONS) | ||||||||
# Return a new Resilient Distributed Dataset (RDD) by applying a function to each element of this RDD. | ||||||||
rdd = df.rdd.map(lambda x : blob(x)) | ||||||||
TrevorBenson marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
# Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results. Then transform it into a dataframe. | ||||||||
dfnew = rdd.flatMap(lambda x: x).toDF() | ||||||||
TrevorBenson marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
|
||||||||
single = "%s://%s/%s/s3fsck/s3-dig-keys.csv" % (PROTOCOL, PATH, RING) | ||||||||
# write the dataframe to a csv file with a header | ||||||||
TrevorBenson marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
# output structure: (digkey, sproxyd input key, subkey if available) | ||||||||
dfnew.write.format("csv").mode("overwrite").options(header="true").save(single) | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we want correct headers, this write operation w/
Suggested change
Requires updating the p2 script to read the new column names instead of the generic ones. |
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -39,37 +39,77 @@ | |||||||||
|
||||||||||
|
||||||||||
files = "%s://%s/%s/listkeys.csv" % (PROTOCOL, PATH, RING) | ||||||||||
# FIXME skip native column names, rename on the fly. | ||||||||||
# reading without a header, | ||||||||||
# columns _c0, _c1, _c2, _c3 are the default column names of | ||||||||||
# columns 1, 2, 3, 4 for the csv | ||||||||||
# REQUIRED N, Y, N, Y | ||||||||||
# input structure: (RING key, main chunk, disk, flag) | ||||||||||
# e.g. 555555A4948FAA554034E155555555A61470C07A,8000004F3F3A54FFEADF8C00000000511470C070,g1disk1,0 | ||||||||||
# Required Fields: | ||||||||||
# - _c1 (main chunk) | ||||||||||
# - _c3 (flag) | ||||||||||
df = spark.read.format("csv").option("header", "false").option("inferSchema", "true").option("delimiter", ",").load(files) | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is another spot we can inject valid headers prior to later commands, making them a bit simpler to comprehend:
Suggested change
in this example I name the _c0 (ring chunk keys) as ringkey, instead of the naming _c1, the main chunk, as ringkey. I think this could potentially reduce confusion if we decide to be very specific and use explicit terms for each data type
I like the underscore versions for better readability. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. whatever's easier to read is fine by me |
||||||||||
|
||||||||||
#list the ARC SPLIT main chunks | ||||||||||
# list the ARC_REPLICATED main chunks from column 2 with service ID 50 and flag = 0 (new), into a single column (vector) named "_c1" | ||||||||||
# FIXME rename column on the fly | ||||||||||
df_split = df.filter(df["_c1"].rlike(r".*000000..50........$") & df["_c3"].rlike("0")).select("_c1") | ||||||||||
|
||||||||||
# In df_split, match keys which end in 70 from column with litteral name "_c1", to a new dataframe dfARCsingle | ||||||||||
# FIXME explain what we're trying to accomplish by identifying such keys (0x50: ARC_REPLICATED, per UKS --> calls for a COS ending, not an ARC ending). Under what circumstances should such keys exist? | ||||||||||
dfARCsingle = df_split.filter(df["_c1"].rlike(r".*70$")) | ||||||||||
# Filter out when strictly less than 4 stripe chunks (RING orphans), creating a new "count" column on the fly | ||||||||||
# dfARCsingle now has two columns ("_c1", "count") | ||||||||||
dfARCsingle = dfARCsingle.groupBy("_c1").count().filter("count > 3") | ||||||||||
|
||||||||||
# in dfARCsingle, duplicate column named "_c1" into a new "ringkey" (aka. "main chunk") column | ||||||||||
# dfARCsingle now has three columns ("_c1", "count", "ringkey") | ||||||||||
# FIXME do the renaming some place else, e.g. upon dataframe creation, be consistent about it | ||||||||||
dfARCsingle = dfARCsingle.withColumn("ringkey",dfARCsingle["_c1"]) | ||||||||||
|
||||||||||
# in df_split (a vector of main chunks), filter column named "_c1" RING key main chunk for the configured COS protection | ||||||||||
dfCOSsingle = df_split.filter(df["_c1"].rlike(r".*" + str(COS) + "0$")) | ||||||||||
|
||||||||||
# count the number of chunks in column named "_c1" found for each key, creating the "count" column on the fly | ||||||||||
# dfCOSsingle now has two columns ("_c1", "count") | ||||||||||
dfCOSsingle = dfCOSsingle.groupBy("_c1").count() | ||||||||||
# in dfCOSsingle, duplicate column named "_c1" into a new "ringkey" (aka. "main chunk") column | ||||||||||
# dfCOSsingle now has three columns ("_c1", "count", "ringkey") | ||||||||||
dfCOSsingle = dfCOSsingle.withColumn("ringkey",dfCOSsingle["_c1"]) | ||||||||||
# in dfCOSsingle, do an in-place substring operation on column "_c1": get the 26 first characters of the main chunk (MD5 hash of the input key + 4 extra chars) | ||||||||||
# FIXME: say why we need those 4 extra characters (about 18% more weight than the 22-char md5 alone) | ||||||||||
dfCOSsingle = dfCOSsingle.withColumn("_c1",F.expr("substring(_c1, 1, length(_c1)-14)")) | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This operation likely explains why the |
||||||||||
|
||||||||||
# union the dfCOSsingle and dfARCsingle dataframes ("_c1", "count", "ringkey") | ||||||||||
dfARCsingle = dfARCsingle.union(dfCOSsingle) | ||||||||||
|
||||||||||
#list the ARC KEYS | ||||||||||
# list the ARC_SINGLE keys with service ID 51 | ||||||||||
# repeat the same logic as before, with a different initial mask | ||||||||||
# Output is a three-column matrix that will be unioned with the previous dataframe dfARCsingle | ||||||||||
df_sync = df.filter(df["_c1"].rlike(r".*000000..51........$")).select("_c1") | ||||||||||
|
||||||||||
# Match keys which end in 70 from column 2 | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The way this reads to me makes me wonder about a column named "single", which we do not have. This comment is before df_sync and dfARCSYNC operations. Does "from single column" intend to suggest the match is not on more than one column? If so for clarity I suggest:
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks for rephrasing 😄 |
||||||||||
dfARCSYNC = df_sync.filter(df["_c1"].rlike(r".*70$")) | ||||||||||
# Filter out when less than 3 stripe chunks (RING orphans) | ||||||||||
dfARCSYNC = dfARCSYNC.groupBy("_c1").count().filter("count > 3") | ||||||||||
# dfARCSYNC "_c1" column is duplicated into a "ringkey" column | ||||||||||
dfARCSYNC = dfARCSYNC.withColumn("ringkey",dfARCSYNC["_c1"]) | ||||||||||
# in dfARCSYNC, do an in-place substring operation on column "_c1": get the 26 first characters of the main chunk (MD5 hash of the input key + 4 extra chars) | ||||||||||
dfARCSYNC = dfARCSYNC.withColumn("_c1",F.expr("substring(_c1, 1, length(_c1)-14)")) | ||||||||||
|
||||||||||
# filter "_c1" for configured COS protection | ||||||||||
dfCOCSYNC = df_sync.filter(df["_c1"].rlike(r".*" + str(COS) + "0$")) | ||||||||||
# count the number of chunks in "_c1" found for each key | ||||||||||
dfCOCSYNC = dfCOCSYNC.groupBy("_c1").count() | ||||||||||
# dfCOCSYNC "_c1" column is duplicated into a "ringkey" column | ||||||||||
dfCOCSYNC = dfCOCSYNC.withColumn("ringkey",dfCOCSYNC["_c1"]) | ||||||||||
# in dfCOCSYNC, do an in-place substring operation on column "_c1": get the 26 first characters of the main chunk (MD5 hash of the input key + 4 extra chars) | ||||||||||
dfCOCSYNC = dfCOCSYNC.withColumn("_c1",F.expr("substring(_c1, 1, length(_c1)-14)")) | ||||||||||
|
||||||||||
# union the two previous dataframes | ||||||||||
dfARCSYNC = dfARCSYNC.union(dfCOCSYNC) | ||||||||||
|
||||||||||
# union again the two outstanding dataframes dfARCSYNC and dfARCSINGLE into a dftotal dataframe | ||||||||||
dftotal = dfARCSYNC.union(dfARCsingle) | ||||||||||
total = "%s://%s/%s/s3fsck/arc-keys.csv" % (PROTOCOL, PATH, RING) | ||||||||||
dftotal.write.format("csv").mode("overwrite").options(header="true").save(total) |
Uh oh!
There was an error while loading. Please reload this page.