In [1]:
from pyspark.sql import SparkSession
import numpy as np

In [2]:
sc = SparkSession.builder.appName('Untitled').getOrCreate().sparkContext

# Load tile data into RDD

In [3]:
tileIndicesRdd = sc.parallelize([i for i in range(70)])

In [4]:
def loadTileData(tileIndex):
    filename = f"frog_dog8x8_tile{tileIndex:02d}.j2k"
    with open(filename,"rb") as j2k_file:
        j2k_data = j2k_file.read()
    return j2k_data

tileDataRdd = tileIndicesRdd.map(loadTileData)

# Examine tile data structure

In [5]:
def getDataOffsets(data):
    pos = 2
    eot = len(data)
    sot = None
    sod = None
    while sod is None and pos < eot:
        marker = data[pos:pos+2].hex()
        if marker == 'ff90':
            sot  = pos
            lsot = int.from_bytes(data[pos+2:pos+4],byteorder='big')
            pos += 2 + lsot
        elif marker == 'ff93':
            sod = pos
        else:
            len_marker_segment = int.from_bytes(data[pos+2:pos+4],byteorder='big')
            pos += 2 + len_marker_segment
    
    if sod is None:
        return {}
    
    if data[-2:].hex() != 'ffd9':   # EOC
        return {}
    
    return {'sot':sot, 'sod':sod, 'eod':eot-2}

dataOffsetsRdd = tileDataRdd.map(getDataOffsets)
dataOffsets = dataOffsetsRdd.collect()
dataOffsets

[{'sot': 125, 'sod': 187, 'eod': 1435218},
 {'sot': 125, 'sod': 187, 'eod': 1092016},
 {'sot': 125, 'sod': 187, 'eod': 1246233},
 {'sot': 125, 'sod': 187, 'eod': 1272080},
 {'sot': 125, 'sod': 187, 'eod': 1357103},
 {'sot': 125, 'sod': 187, 'eod': 1408908},
 {'sot': 125, 'sod': 187, 'eod': 1414039},
 {'sot': 125, 'sod': 187, 'eod': 1387895},
 {'sot': 125, 'sod': 187, 'eod': 1069497},
 {'sot': 125, 'sod': 184, 'eod': 633083},
 {'sot': 125, 'sod': 187, 'eod': 1608113},
 {'sot': 125, 'sod': 187, 'eod': 1278186},
 {'sot': 125, 'sod': 187, 'eod': 1439036},
 {'sot': 125, 'sod': 187, 'eod': 1429876},
 {'sot': 125, 'sod': 187, 'eod': 1525465},
 {'sot': 125, 'sod': 187, 'eod': 1585143},
 {'sot': 125, 'sod': 187, 'eod': 1566476},
 {'sot': 125, 'sod': 187, 'eod': 1551768},
 {'sot': 125, 'sod': 187, 'eod': 1266694},
 {'sot': 125, 'sod': 184, 'eod': 675430},
 {'sot': 125, 'sod': 187, 'eod': 1603448},
 {'sot': 125, 'sod': 187, 'eod': 1259712},
 {'sot': 125, 'sod': 187, 'eod': 1442520},
 {'sot': 125,

# Extract Main Header

In [6]:
def extractMainHeader(tileData):
    (data,offsets) = tileData
    return data[ :offsets['sot'] ]

mainHeader = tileDataRdd.zip(dataOffsetsRdd).map(extractMainHeader).first()
mainHeader

b'\xffO\xffQ\x00/\x00\x00\x00\x00\x04\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x03\x07\x01\x01\x07\x01\x01\x07\x01\x01\xffR\x00\x0c\x00\x00\x00\x01\x00\x05\x04\x04\x00\x01\xff\\\x00\x13@@HHPHHPHHPHHPHHP\xffd\x00%\x00\x01Created by OpenJPEG version 2.4.0'

# Extract Tile Headers

In [7]:
def extractTileHeader(tileData):
    (data,offsets) = tileData
    return data[ offsets['sot']:offsets['sod'] ]

def fixIsot(tileData):
    (tileHeader,tileIndex) = tileData
    Isot = tileIndex.to_bytes(2,byteorder='big')
    return tileHeader[:4] + Isot + tileHeader[6:]

tileHeadersRdd = tileDataRdd.zip(dataOffsetsRdd).map(extractTileHeader)
newTileHeadersRdd = tileHeadersRdd.zip(tileIndicesRdd).map(fixIsot)

### Verify corrected Isot

In [8]:
def parseIsot(tileHeader):
    return int.from_bytes(tileHeader[4:6],byteorder='big')

print('before')
print(list(map(parseIsot, tileHeadersRdd.collect())))
print('after')
print(list(map(parseIsot, newTileHeadersRdd.collect())))

before
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
after
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69]


# Extract Tile Bitstream Data

In [9]:
def extractTileBitstream(tileData):
    (data,offsets) = tileData
    return data[ offsets['sod']:offsets['eod'] ]

tileBitstreamRdd = tileDataRdd.zip(dataOffsetsRdd).map(extractTileBitstream)

# Modify SIZ marker segment

***In actual code, will need to figure out what all in Main Header (and Tile Headers) needs to be modified.  For now, simply hardcoding the SIZ fields to reflect the size of the overall image***

In [10]:
{
    'Lsiz':int.from_bytes(mainHeader[4:6],byteorder='big'),
    'Rsiz':mainHeader[6:8].hex(),
    'XSiz':int.from_bytes(mainHeader[8:12],byteorder='big'),
    'YSiz':int.from_bytes(mainHeader[12:16],byteorder='big'),
    'XOSiz':int.from_bytes(mainHeader[16:20],byteorder='big'),
    'YOSiz':int.from_bytes(mainHeader[20:24],byteorder='big'),
    'XTSiz':int.from_bytes(mainHeader[24:28],byteorder='big'),
    'YTSiz':int.from_bytes(mainHeader[28:32],byteorder='big'),
    'XTOSiz':int.from_bytes(mainHeader[32:36],byteorder='big'),
    'YTOSiz':int.from_bytes(mainHeader[36:40],byteorder='big'),
    'Csiz':int.from_bytes(mainHeader[40:42],byteorder='big'),
    'Ssiz':mainHeader[42:43].hex(),
    'XRsiz':int.from_bytes(mainHeader[43:44],byteorder='big'),
    'YRsiz':int.from_bytes(mainHeader[44:45],byteorder='big'),
}

{'Lsiz': 47,
 'Rsiz': '0000',
 'XSiz': 1024,
 'YSiz': 1024,
 'XOSiz': 0,
 'YOSiz': 0,
 'XTSiz': 1024,
 'YTSiz': 1024,
 'XTOSiz': 0,
 'YTOSiz': 0,
 'Csiz': 3,
 'Ssiz': '07',
 'XRsiz': 1,
 'YRsiz': 1}

In [11]:
newXSiz = 9*1024 + 384
newYSiz = 6*1024 + 536

newMainHeader = mainHeader[0:8] \
+ newXSiz.to_bytes(4,byteorder='big') \
+ newYSiz.to_bytes(4,byteorder='big') \
+ mainHeader[16:]

{   'Lsiz':int.from_bytes(newMainHeader[4:6],byteorder='big'),
    'Rsiz':newMainHeader[6:8].hex(),
    'XSiz':int.from_bytes(newMainHeader[8:12],byteorder='big'),
    'YSiz':int.from_bytes(newMainHeader[12:16],byteorder='big'),
    'XOSiz':int.from_bytes(newMainHeader[16:20],byteorder='big'),
    'YOSiz':int.from_bytes(newMainHeader[20:24],byteorder='big'),
    'XTSiz':int.from_bytes(newMainHeader[24:28],byteorder='big'),
    'YTSiz':int.from_bytes(newMainHeader[28:32],byteorder='big'),
    'XTOSiz':int.from_bytes(newMainHeader[32:36],byteorder='big'),
    'YTOSiz':int.from_bytes(newMainHeader[36:40],byteorder='big'),
    'Csiz':int.from_bytes(newMainHeader[40:42],byteorder='big'),
    'Ssiz':newMainHeader[42:43].hex(),
    'XRsiz':int.from_bytes(newMainHeader[43:44],byteorder='big'),
    'YRsiz':int.from_bytes(newMainHeader[44:45],byteorder='big'),
}

{'Lsiz': 47,
 'Rsiz': '0000',
 'XSiz': 9600,
 'YSiz': 6680,
 'XOSiz': 0,
 'YOSiz': 0,
 'XTSiz': 1024,
 'YTSiz': 1024,
 'XTOSiz': 0,
 'YTOSiz': 0,
 'Csiz': 3,
 'Ssiz': '07',
 'XRsiz': 1,
 'YRsiz': 1}

# Create new tile RDD with the modified headers

### Start with main header (first tile only)

In [12]:
newTiles = [newMainHeader if i<1 else b'' for i in range(70)]
newTilesRdd = sc.parallelize(newTiles)

print(newTilesRdd.map(lambda x: len(x)).collect())

[125, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]


### Add the (modified) tile headers

In [13]:
newTilesRdd = newTilesRdd.zip(newTileHeadersRdd).map(lambda x: x[0]+x[1])

print(newTilesRdd.map(lambda x: len(x)).collect())

[187, 62, 62, 62, 62, 62, 62, 62, 62, 59, 62, 62, 62, 62, 62, 62, 62, 62, 62, 59, 62, 62, 62, 62, 62, 62, 62, 62, 62, 59, 62, 62, 62, 62, 62, 62, 62, 62, 62, 59, 62, 62, 62, 62, 62, 62, 62, 62, 62, 59, 62, 62, 62, 62, 62, 62, 62, 62, 62, 59, 62, 59, 59, 59, 59, 62, 62, 62, 59, 59]


### Add the bitstream data

In [14]:
newTilesRdd = newTilesRdd.zip(tileBitstreamRdd).map(lambda x: x[0]+x[1])

print(newTilesRdd.map(lambda x: len(x)).collect())

[1435218, 1091891, 1246108, 1271955, 1356978, 1408783, 1413914, 1387770, 1069372, 632958, 1607988, 1278061, 1438911, 1429751, 1525340, 1585018, 1566351, 1551643, 1266569, 675305, 1603323, 1259587, 1442395, 1415475, 1481715, 1540312, 1590600, 1541980, 1253533, 664326, 1562334, 1207293, 1364774, 1359430, 1470361, 1539926, 1548114, 1501283, 1199787, 666451, 1471825, 1125133, 1269219, 1320969, 1396522, 1445309, 1452598, 1419308, 1107865, 650033, 1587535, 1250440, 1404379, 1402718, 1505172, 1569570, 1551535, 1535659, 1234146, 672036, 894111, 706244, 801668, 784779, 826470, 856786, 893955, 853902, 711228, 364981]


### Add EOC (last tile only)

In [15]:
eocRdd = sc.parallelize([b'\xff\xd9' if i==69 else b'' for i in range(70)])

newTilesRdd = newTilesRdd.zip(eocRdd).map(lambda x: x[0]+x[1])

print(newTilesRdd.map(lambda x: len(x)).collect())

[1435218, 1091891, 1246108, 1271955, 1356978, 1408783, 1413914, 1387770, 1069372, 632958, 1607988, 1278061, 1438911, 1429751, 1525340, 1585018, 1566351, 1551643, 1266569, 675305, 1603323, 1259587, 1442395, 1415475, 1481715, 1540312, 1590600, 1541980, 1253533, 664326, 1562334, 1207293, 1364774, 1359430, 1470361, 1539926, 1548114, 1501283, 1199787, 666451, 1471825, 1125133, 1269219, 1320969, 1396522, 1445309, 1452598, 1419308, 1107865, 650033, 1587535, 1250440, 1404379, 1402718, 1505172, 1569570, 1551535, 1535659, 1234146, 672036, 894111, 706244, 801668, 784779, 826470, 856786, 893955, 853902, 711228, 364983]


In [16]:
# Sanity check

tileDataLen = np.array(tileDataRdd.map(lambda x:len(x)).collect())
newTilesLen = np.array(newTilesRdd.map(lambda x:len(x)).collect())

deltaLen = tileDataLen - newTilesLen

#expected delta
# first tile: 2 = stripped EOC marker
# last tile: 125 = mainHeader
# all other tiles: 127 = stripped EOC marker + mainHeader

print(deltaLen)

[  2 127 127 127 127 127 127 127 127 127 127 127 127 127 127 127 127 127
 127 127 127 127 127 127 127 127 127 127 127 127 127 127 127 127 127 127
 127 127 127 127 127 127 127 127 127 127 127 127 127 127 127 127 127 127
 127 127 127 127 127 127 127 127 127 127 127 127 127 127 127 125]


# Get tile data sizes and generate partition numbers

In [40]:
newTilesLen = newTilesRdd.map(lambda x:len(x)).collect()

curIndex = 0
curSize = 0
newIndices = []
for tileLen in newTilesLen:
    newIndices.append(curIndex)
    curSize += tileLen
    if curSize >= 5242880:  # 5MB
        curIndex += 1
        curSize = 0

newIndicesRdd = sc.parallelize(newIndices)

indexedTilesRdd = newIndicesRdd.zip(newTilesRdd).map(lambda x: x)

#sanity check
indexedTilesRdd.map(lambda x: (x[0],len(x[1]))).collect()

[(0, 1435218),
 (0, 1091891),
 (0, 1246108),
 (0, 1271955),
 (0, 1356978),
 (1, 1408783),
 (1, 1413914),
 (1, 1387770),
 (1, 1069372),
 (2, 632958),
 (2, 1607988),
 (2, 1278061),
 (2, 1438911),
 (2, 1429751),
 (3, 1525340),
 (3, 1585018),
 (3, 1566351),
 (3, 1551643),
 (4, 1266569),
 (4, 675305),
 (4, 1603323),
 (4, 1259587),
 (4, 1442395),
 (5, 1415475),
 (5, 1481715),
 (5, 1540312),
 (5, 1590600),
 (6, 1541980),
 (6, 1253533),
 (6, 664326),
 (6, 1562334),
 (6, 1207293),
 (7, 1364774),
 (7, 1359430),
 (7, 1470361),
 (7, 1539926),
 (8, 1548114),
 (8, 1501283),
 (8, 1199787),
 (8, 666451),
 (8, 1471825),
 (9, 1125133),
 (9, 1269219),
 (9, 1320969),
 (9, 1396522),
 (9, 1445309),
 (10, 1452598),
 (10, 1419308),
 (10, 1107865),
 (10, 650033),
 (10, 1587535),
 (11, 1250440),
 (11, 1404379),
 (11, 1402718),
 (11, 1505172),
 (12, 1569570),
 (12, 1551535),
 (12, 1535659),
 (12, 1234146),
 (13, 672036),
 (13, 894111),
 (13, 706244),
 (13, 801668),
 (13, 784779),
 (13, 826470),
 (13, 856786),
 (

In [41]:
indexedTilesRdd.filter(lambda x:x[0]==1).map(lambda x:(x[0],len(x[1]))).collect()

[(1, 1408783), (1, 1413914), (1, 1387770), (1, 1069372)]

# This is where we will write to S3

In [44]:
num_indices = len(set(newIndices))
for i in range(num_indices):
    blockData = indexedTilesRdd.filter(lambda x:x[0]==i).map(lambda x:x[1]).collect()
    numTilesInBlock = len(blockData)
    blockData = b''.join(blockData)
    bytesInBlock = len(blockData)
    megabytesInBlock = bytesInBlock / (1024*1024)
    print(f'Adding tiles in block {i} to S3: {numTilesInBlock} tiles containing {megabytesInBlock:.2f}MB')

Adding tiles in block 0 to S3: 5 tiles containing 6.11MB
Adding tiles in block 1 to S3: 4 tiles containing 5.04MB
Adding tiles in block 2 to S3: 5 tiles containing 6.09MB
Adding tiles in block 3 to S3: 4 tiles containing 5.94MB
Adding tiles in block 4 to S3: 5 tiles containing 5.96MB
Adding tiles in block 5 to S3: 4 tiles containing 5.75MB
Adding tiles in block 6 to S3: 5 tiles containing 5.94MB
Adding tiles in block 7 to S3: 4 tiles containing 5.47MB
Adding tiles in block 8 to S3: 5 tiles containing 6.09MB
Adding tiles in block 9 to S3: 5 tiles containing 6.25MB
Adding tiles in block 10 to S3: 5 tiles containing 5.93MB
Adding tiles in block 11 to S3: 4 tiles containing 5.31MB
Adding tiles in block 12 to S3: 4 tiles containing 5.62MB
Adding tiles in block 13 to S3: 7 tiles containing 5.29MB
Adding tiles in block 14 to S3: 4 tiles containing 2.69MB


### But for now, we just want to verify that the joined j2k data is valid

In [47]:
fullJ2kData = b''

num_indices = len(set(newIndices))
for i in range(num_indices):
    blockData = indexedTilesRdd.filter(lambda x:x[0]==i).map(lambda x:x[1]).collect()
    blockData = b''.join(blockData)
    fullJ2kData = fullJ2kData + blockData
    
print(len(fullJ2kData))

with open("frog_dog8x8_rejoined.j2k","wb") as j2k_file:
    j2k_file.write(fullJ2kData)

87518980
