diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml new file mode 100644 index 0000000..b9ca43d --- /dev/null +++ b/.github/workflows/build.yaml @@ -0,0 +1,38 @@ +name: Build Rucio/Butler ingestd container +on: + push: + tags: + - v* + pull_request: + +env: + HERMESK_NAME: rucio-daemons-hermesk + +jobs: + push: + runs-on: ubuntu-latest + permissions: + packages: write + contents: read + steps: + - name: Checkout + uses: actions/checkout@v3 + + - name: Build ingest image + working-directory: docker + run: | + docker-compose -f "docker-compose.yml" --env-file versions.env build $HERMESK_NAME + + - name: Log in to GitHub Container Registry + run: echo "${{ secrets.GITHUB_TOKEN }}" | docker login ghcr.io -u $ --password-stdin + + - name: Push image + run: | + HERMESK_ID=ghcr.io/${{ github.repository_owner }}/$HERMESK_NAME + + # Strip git ref prefix from version + VERSION=$(cat docker/versions.env | grep RUCIO_DAEMONS_HERMESK_VERSION | sed 's/=/ /g'|cut -d " " -f2) + echo HERMESK_ID=$HERMESK_ID + echo VERSION=$VERSION + docker tag $HERMESK_NAME $HERMESK_ID:$VERSION + docker push $HERMESK_ID:$VERSION diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..94a9ed0 --- /dev/null +++ b/LICENSE @@ -0,0 +1,674 @@ + GNU GENERAL PUBLIC LICENSE + Version 3, 29 June 2007 + + Copyright (C) 2007 Free Software Foundation, Inc. + Everyone is permitted to copy and distribute verbatim copies + of this license document, but changing it is not allowed. + + Preamble + + The GNU General Public License is a free, copyleft license for +software and other kinds of works. + + The licenses for most software and other practical works are designed +to take away your freedom to share and change the works. By contrast, +the GNU General Public License is intended to guarantee your freedom to +share and change all versions of a program--to make sure it remains free +software for all its users. We, the Free Software Foundation, use the +GNU General Public License for most of our software; it applies also to +any other work released this way by its authors. You can apply it to +your programs, too. + + When we speak of free software, we are referring to freedom, not +price. Our General Public Licenses are designed to make sure that you +have the freedom to distribute copies of free software (and charge for +them if you wish), that you receive source code or can get it if you +want it, that you can change the software or use pieces of it in new +free programs, and that you know you can do these things. + + To protect your rights, we need to prevent others from denying you +these rights or asking you to surrender the rights. Therefore, you have +certain responsibilities if you distribute copies of the software, or if +you modify it: responsibilities to respect the freedom of others. + + For example, if you distribute copies of such a program, whether +gratis or for a fee, you must pass on to the recipients the same +freedoms that you received. You must make sure that they, too, receive +or can get the source code. And you must show them these terms so they +know their rights. + + Developers that use the GNU GPL protect your rights with two steps: +(1) assert copyright on the software, and (2) offer you this License +giving you legal permission to copy, distribute and/or modify it. + + For the developers' and authors' protection, the GPL clearly explains +that there is no warranty for this free software. For both users' and +authors' sake, the GPL requires that modified versions be marked as +changed, so that their problems will not be attributed erroneously to +authors of previous versions. + + Some devices are designed to deny users access to install or run +modified versions of the software inside them, although the manufacturer +can do so. This is fundamentally incompatible with the aim of +protecting users' freedom to change the software. The systematic +pattern of such abuse occurs in the area of products for individuals to +use, which is precisely where it is most unacceptable. Therefore, we +have designed this version of the GPL to prohibit the practice for those +products. If such problems arise substantially in other domains, we +stand ready to extend this provision to those domains in future versions +of the GPL, as needed to protect the freedom of users. + + Finally, every program is threatened constantly by software patents. +States should not allow patents to restrict development and use of +software on general-purpose computers, but in those that do, we wish to +avoid the special danger that patents applied to a free program could +make it effectively proprietary. To prevent this, the GPL assures that +patents cannot be used to render the program non-free. + + The precise terms and conditions for copying, distribution and +modification follow. + + TERMS AND CONDITIONS + + 0. Definitions. + + "This License" refers to version 3 of the GNU General Public License. + + "Copyright" also means copyright-like laws that apply to other kinds of +works, such as semiconductor masks. + + "The Program" refers to any copyrightable work licensed under this +License. Each licensee is addressed as "you". "Licensees" and +"recipients" may be individuals or organizations. + + To "modify" a work means to copy from or adapt all or part of the work +in a fashion requiring copyright permission, other than the making of an +exact copy. The resulting work is called a "modified version" of the +earlier work or a work "based on" the earlier work. + + A "covered work" means either the unmodified Program or a work based +on the Program. + + To "propagate" a work means to do anything with it that, without +permission, would make you directly or secondarily liable for +infringement under applicable copyright law, except executing it on a +computer or modifying a private copy. Propagation includes copying, +distribution (with or without modification), making available to the +public, and in some countries other activities as well. + + To "convey" a work means any kind of propagation that enables other +parties to make or receive copies. Mere interaction with a user through +a computer network, with no transfer of a copy, is not conveying. + + An interactive user interface displays "Appropriate Legal Notices" +to the extent that it includes a convenient and prominently visible +feature that (1) displays an appropriate copyright notice, and (2) +tells the user that there is no warranty for the work (except to the +extent that warranties are provided), that licensees may convey the +work under this License, and how to view a copy of this License. If +the interface presents a list of user commands or options, such as a +menu, a prominent item in the list meets this criterion. + + 1. Source Code. + + The "source code" for a work means the preferred form of the work +for making modifications to it. "Object code" means any non-source +form of a work. + + A "Standard Interface" means an interface that either is an official +standard defined by a recognized standards body, or, in the case of +interfaces specified for a particular programming language, one that +is widely used among developers working in that language. + + The "System Libraries" of an executable work include anything, other +than the work as a whole, that (a) is included in the normal form of +packaging a Major Component, but which is not part of that Major +Component, and (b) serves only to enable use of the work with that +Major Component, or to implement a Standard Interface for which an +implementation is available to the public in source code form. A +"Major Component", in this context, means a major essential component +(kernel, window system, and so on) of the specific operating system +(if any) on which the executable work runs, or a compiler used to +produce the work, or an object code interpreter used to run it. + + The "Corresponding Source" for a work in object code form means all +the source code needed to generate, install, and (for an executable +work) run the object code and to modify the work, including scripts to +control those activities. However, it does not include the work's +System Libraries, or general-purpose tools or generally available free +programs which are used unmodified in performing those activities but +which are not part of the work. For example, Corresponding Source +includes interface definition files associated with source files for +the work, and the source code for shared libraries and dynamically +linked subprograms that the work is specifically designed to require, +such as by intimate data communication or control flow between those +subprograms and other parts of the work. + + The Corresponding Source need not include anything that users +can regenerate automatically from other parts of the Corresponding +Source. + + The Corresponding Source for a work in source code form is that +same work. + + 2. Basic Permissions. + + All rights granted under this License are granted for the term of +copyright on the Program, and are irrevocable provided the stated +conditions are met. This License explicitly affirms your unlimited +permission to run the unmodified Program. The output from running a +covered work is covered by this License only if the output, given its +content, constitutes a covered work. This License acknowledges your +rights of fair use or other equivalent, as provided by copyright law. + + You may make, run and propagate covered works that you do not +convey, without conditions so long as your license otherwise remains +in force. You may convey covered works to others for the sole purpose +of having them make modifications exclusively for you, or provide you +with facilities for running those works, provided that you comply with +the terms of this License in conveying all material for which you do +not control copyright. Those thus making or running the covered works +for you must do so exclusively on your behalf, under your direction +and control, on terms that prohibit them from making any copies of +your copyrighted material outside their relationship with you. + + Conveying under any other circumstances is permitted solely under +the conditions stated below. Sublicensing is not allowed; section 10 +makes it unnecessary. + + 3. Protecting Users' Legal Rights From Anti-Circumvention Law. + + No covered work shall be deemed part of an effective technological +measure under any applicable law fulfilling obligations under article +11 of the WIPO copyright treaty adopted on 20 December 1996, or +similar laws prohibiting or restricting circumvention of such +measures. + + When you convey a covered work, you waive any legal power to forbid +circumvention of technological measures to the extent such circumvention +is effected by exercising rights under this License with respect to +the covered work, and you disclaim any intention to limit operation or +modification of the work as a means of enforcing, against the work's +users, your or third parties' legal rights to forbid circumvention of +technological measures. + + 4. Conveying Verbatim Copies. + + You may convey verbatim copies of the Program's source code as you +receive it, in any medium, provided that you conspicuously and +appropriately publish on each copy an appropriate copyright notice; +keep intact all notices stating that this License and any +non-permissive terms added in accord with section 7 apply to the code; +keep intact all notices of the absence of any warranty; and give all +recipients a copy of this License along with the Program. + + You may charge any price or no price for each copy that you convey, +and you may offer support or warranty protection for a fee. + + 5. Conveying Modified Source Versions. + + You may convey a work based on the Program, or the modifications to +produce it from the Program, in the form of source code under the +terms of section 4, provided that you also meet all of these conditions: + + a) The work must carry prominent notices stating that you modified + it, and giving a relevant date. + + b) The work must carry prominent notices stating that it is + released under this License and any conditions added under section + 7. This requirement modifies the requirement in section 4 to + "keep intact all notices". + + c) You must license the entire work, as a whole, under this + License to anyone who comes into possession of a copy. This + License will therefore apply, along with any applicable section 7 + additional terms, to the whole of the work, and all its parts, + regardless of how they are packaged. This License gives no + permission to license the work in any other way, but it does not + invalidate such permission if you have separately received it. + + d) If the work has interactive user interfaces, each must display + Appropriate Legal Notices; however, if the Program has interactive + interfaces that do not display Appropriate Legal Notices, your + work need not make them do so. + + A compilation of a covered work with other separate and independent +works, which are not by their nature extensions of the covered work, +and which are not combined with it such as to form a larger program, +in or on a volume of a storage or distribution medium, is called an +"aggregate" if the compilation and its resulting copyright are not +used to limit the access or legal rights of the compilation's users +beyond what the individual works permit. Inclusion of a covered work +in an aggregate does not cause this License to apply to the other +parts of the aggregate. + + 6. Conveying Non-Source Forms. + + You may convey a covered work in object code form under the terms +of sections 4 and 5, provided that you also convey the +machine-readable Corresponding Source under the terms of this License, +in one of these ways: + + a) Convey the object code in, or embodied in, a physical product + (including a physical distribution medium), accompanied by the + Corresponding Source fixed on a durable physical medium + customarily used for software interchange. + + b) Convey the object code in, or embodied in, a physical product + (including a physical distribution medium), accompanied by a + written offer, valid for at least three years and valid for as + long as you offer spare parts or customer support for that product + model, to give anyone who possesses the object code either (1) a + copy of the Corresponding Source for all the software in the + product that is covered by this License, on a durable physical + medium customarily used for software interchange, for a price no + more than your reasonable cost of physically performing this + conveying of source, or (2) access to copy the + Corresponding Source from a network server at no charge. + + c) Convey individual copies of the object code with a copy of the + written offer to provide the Corresponding Source. This + alternative is allowed only occasionally and noncommercially, and + only if you received the object code with such an offer, in accord + with subsection 6b. + + d) Convey the object code by offering access from a designated + place (gratis or for a charge), and offer equivalent access to the + Corresponding Source in the same way through the same place at no + further charge. You need not require recipients to copy the + Corresponding Source along with the object code. If the place to + copy the object code is a network server, the Corresponding Source + may be on a different server (operated by you or a third party) + that supports equivalent copying facilities, provided you maintain + clear directions next to the object code saying where to find the + Corresponding Source. Regardless of what server hosts the + Corresponding Source, you remain obligated to ensure that it is + available for as long as needed to satisfy these requirements. + + e) Convey the object code using peer-to-peer transmission, provided + you inform other peers where the object code and Corresponding + Source of the work are being offered to the general public at no + charge under subsection 6d. + + A separable portion of the object code, whose source code is excluded +from the Corresponding Source as a System Library, need not be +included in conveying the object code work. + + A "User Product" is either (1) a "consumer product", which means any +tangible personal property which is normally used for personal, family, +or household purposes, or (2) anything designed or sold for incorporation +into a dwelling. In determining whether a product is a consumer product, +doubtful cases shall be resolved in favor of coverage. For a particular +product received by a particular user, "normally used" refers to a +typical or common use of that class of product, regardless of the status +of the particular user or of the way in which the particular user +actually uses, or expects or is expected to use, the product. A product +is a consumer product regardless of whether the product has substantial +commercial, industrial or non-consumer uses, unless such uses represent +the only significant mode of use of the product. + + "Installation Information" for a User Product means any methods, +procedures, authorization keys, or other information required to install +and execute modified versions of a covered work in that User Product from +a modified version of its Corresponding Source. The information must +suffice to ensure that the continued functioning of the modified object +code is in no case prevented or interfered with solely because +modification has been made. + + If you convey an object code work under this section in, or with, or +specifically for use in, a User Product, and the conveying occurs as +part of a transaction in which the right of possession and use of the +User Product is transferred to the recipient in perpetuity or for a +fixed term (regardless of how the transaction is characterized), the +Corresponding Source conveyed under this section must be accompanied +by the Installation Information. But this requirement does not apply +if neither you nor any third party retains the ability to install +modified object code on the User Product (for example, the work has +been installed in ROM). + + The requirement to provide Installation Information does not include a +requirement to continue to provide support service, warranty, or updates +for a work that has been modified or installed by the recipient, or for +the User Product in which it has been modified or installed. Access to a +network may be denied when the modification itself materially and +adversely affects the operation of the network or violates the rules and +protocols for communication across the network. + + Corresponding Source conveyed, and Installation Information provided, +in accord with this section must be in a format that is publicly +documented (and with an implementation available to the public in +source code form), and must require no special password or key for +unpacking, reading or copying. + + 7. Additional Terms. + + "Additional permissions" are terms that supplement the terms of this +License by making exceptions from one or more of its conditions. +Additional permissions that are applicable to the entire Program shall +be treated as though they were included in this License, to the extent +that they are valid under applicable law. If additional permissions +apply only to part of the Program, that part may be used separately +under those permissions, but the entire Program remains governed by +this License without regard to the additional permissions. + + When you convey a copy of a covered work, you may at your option +remove any additional permissions from that copy, or from any part of +it. (Additional permissions may be written to require their own +removal in certain cases when you modify the work.) You may place +additional permissions on material, added by you to a covered work, +for which you have or can give appropriate copyright permission. + + Notwithstanding any other provision of this License, for material you +add to a covered work, you may (if authorized by the copyright holders of +that material) supplement the terms of this License with terms: + + a) Disclaiming warranty or limiting liability differently from the + terms of sections 15 and 16 of this License; or + + b) Requiring preservation of specified reasonable legal notices or + author attributions in that material or in the Appropriate Legal + Notices displayed by works containing it; or + + c) Prohibiting misrepresentation of the origin of that material, or + requiring that modified versions of such material be marked in + reasonable ways as different from the original version; or + + d) Limiting the use for publicity purposes of names of licensors or + authors of the material; or + + e) Declining to grant rights under trademark law for use of some + trade names, trademarks, or service marks; or + + f) Requiring indemnification of licensors and authors of that + material by anyone who conveys the material (or modified versions of + it) with contractual assumptions of liability to the recipient, for + any liability that these contractual assumptions directly impose on + those licensors and authors. + + All other non-permissive additional terms are considered "further +restrictions" within the meaning of section 10. If the Program as you +received it, or any part of it, contains a notice stating that it is +governed by this License along with a term that is a further +restriction, you may remove that term. If a license document contains +a further restriction but permits relicensing or conveying under this +License, you may add to a covered work material governed by the terms +of that license document, provided that the further restriction does +not survive such relicensing or conveying. + + If you add terms to a covered work in accord with this section, you +must place, in the relevant source files, a statement of the +additional terms that apply to those files, or a notice indicating +where to find the applicable terms. + + Additional terms, permissive or non-permissive, may be stated in the +form of a separately written license, or stated as exceptions; +the above requirements apply either way. + + 8. Termination. + + You may not propagate or modify a covered work except as expressly +provided under this License. Any attempt otherwise to propagate or +modify it is void, and will automatically terminate your rights under +this License (including any patent licenses granted under the third +paragraph of section 11). + + However, if you cease all violation of this License, then your +license from a particular copyright holder is reinstated (a) +provisionally, unless and until the copyright holder explicitly and +finally terminates your license, and (b) permanently, if the copyright +holder fails to notify you of the violation by some reasonable means +prior to 60 days after the cessation. + + Moreover, your license from a particular copyright holder is +reinstated permanently if the copyright holder notifies you of the +violation by some reasonable means, this is the first time you have +received notice of violation of this License (for any work) from that +copyright holder, and you cure the violation prior to 30 days after +your receipt of the notice. + + Termination of your rights under this section does not terminate the +licenses of parties who have received copies or rights from you under +this License. If your rights have been terminated and not permanently +reinstated, you do not qualify to receive new licenses for the same +material under section 10. + + 9. Acceptance Not Required for Having Copies. + + You are not required to accept this License in order to receive or +run a copy of the Program. Ancillary propagation of a covered work +occurring solely as a consequence of using peer-to-peer transmission +to receive a copy likewise does not require acceptance. However, +nothing other than this License grants you permission to propagate or +modify any covered work. These actions infringe copyright if you do +not accept this License. Therefore, by modifying or propagating a +covered work, you indicate your acceptance of this License to do so. + + 10. Automatic Licensing of Downstream Recipients. + + Each time you convey a covered work, the recipient automatically +receives a license from the original licensors, to run, modify and +propagate that work, subject to this License. You are not responsible +for enforcing compliance by third parties with this License. + + An "entity transaction" is a transaction transferring control of an +organization, or substantially all assets of one, or subdividing an +organization, or merging organizations. If propagation of a covered +work results from an entity transaction, each party to that +transaction who receives a copy of the work also receives whatever +licenses to the work the party's predecessor in interest had or could +give under the previous paragraph, plus a right to possession of the +Corresponding Source of the work from the predecessor in interest, if +the predecessor has it or can get it with reasonable efforts. + + You may not impose any further restrictions on the exercise of the +rights granted or affirmed under this License. For example, you may +not impose a license fee, royalty, or other charge for exercise of +rights granted under this License, and you may not initiate litigation +(including a cross-claim or counterclaim in a lawsuit) alleging that +any patent claim is infringed by making, using, selling, offering for +sale, or importing the Program or any portion of it. + + 11. Patents. + + A "contributor" is a copyright holder who authorizes use under this +License of the Program or a work on which the Program is based. The +work thus licensed is called the contributor's "contributor version". + + A contributor's "essential patent claims" are all patent claims +owned or controlled by the contributor, whether already acquired or +hereafter acquired, that would be infringed by some manner, permitted +by this License, of making, using, or selling its contributor version, +but do not include claims that would be infringed only as a +consequence of further modification of the contributor version. For +purposes of this definition, "control" includes the right to grant +patent sublicenses in a manner consistent with the requirements of +this License. + + Each contributor grants you a non-exclusive, worldwide, royalty-free +patent license under the contributor's essential patent claims, to +make, use, sell, offer for sale, import and otherwise run, modify and +propagate the contents of its contributor version. + + In the following three paragraphs, a "patent license" is any express +agreement or commitment, however denominated, not to enforce a patent +(such as an express permission to practice a patent or covenant not to +sue for patent infringement). To "grant" such a patent license to a +party means to make such an agreement or commitment not to enforce a +patent against the party. + + If you convey a covered work, knowingly relying on a patent license, +and the Corresponding Source of the work is not available for anyone +to copy, free of charge and under the terms of this License, through a +publicly available network server or other readily accessible means, +then you must either (1) cause the Corresponding Source to be so +available, or (2) arrange to deprive yourself of the benefit of the +patent license for this particular work, or (3) arrange, in a manner +consistent with the requirements of this License, to extend the patent +license to downstream recipients. "Knowingly relying" means you have +actual knowledge that, but for the patent license, your conveying the +covered work in a country, or your recipient's use of the covered work +in a country, would infringe one or more identifiable patents in that +country that you have reason to believe are valid. + + If, pursuant to or in connection with a single transaction or +arrangement, you convey, or propagate by procuring conveyance of, a +covered work, and grant a patent license to some of the parties +receiving the covered work authorizing them to use, propagate, modify +or convey a specific copy of the covered work, then the patent license +you grant is automatically extended to all recipients of the covered +work and works based on it. + + A patent license is "discriminatory" if it does not include within +the scope of its coverage, prohibits the exercise of, or is +conditioned on the non-exercise of one or more of the rights that are +specifically granted under this License. You may not convey a covered +work if you are a party to an arrangement with a third party that is +in the business of distributing software, under which you make payment +to the third party based on the extent of your activity of conveying +the work, and under which the third party grants, to any of the +parties who would receive the covered work from you, a discriminatory +patent license (a) in connection with copies of the covered work +conveyed by you (or copies made from those copies), or (b) primarily +for and in connection with specific products or compilations that +contain the covered work, unless you entered into that arrangement, +or that patent license was granted, prior to 28 March 2007. + + Nothing in this License shall be construed as excluding or limiting +any implied license or other defenses to infringement that may +otherwise be available to you under applicable patent law. + + 12. No Surrender of Others' Freedom. + + If conditions are imposed on you (whether by court order, agreement or +otherwise) that contradict the conditions of this License, they do not +excuse you from the conditions of this License. If you cannot convey a +covered work so as to satisfy simultaneously your obligations under this +License and any other pertinent obligations, then as a consequence you may +not convey it at all. For example, if you agree to terms that obligate you +to collect a royalty for further conveying from those to whom you convey +the Program, the only way you could satisfy both those terms and this +License would be to refrain entirely from conveying the Program. + + 13. Use with the GNU Affero General Public License. + + Notwithstanding any other provision of this License, you have +permission to link or combine any covered work with a work licensed +under version 3 of the GNU Affero General Public License into a single +combined work, and to convey the resulting work. The terms of this +License will continue to apply to the part which is the covered work, +but the special requirements of the GNU Affero General Public License, +section 13, concerning interaction through a network will apply to the +combination as such. + + 14. Revised Versions of this License. + + The Free Software Foundation may publish revised and/or new versions of +the GNU General Public License from time to time. Such new versions will +be similar in spirit to the present version, but may differ in detail to +address new problems or concerns. + + Each version is given a distinguishing version number. If the +Program specifies that a certain numbered version of the GNU General +Public License "or any later version" applies to it, you have the +option of following the terms and conditions either of that numbered +version or of any later version published by the Free Software +Foundation. If the Program does not specify a version number of the +GNU General Public License, you may choose any version ever published +by the Free Software Foundation. + + If the Program specifies that a proxy can decide which future +versions of the GNU General Public License can be used, that proxy's +public statement of acceptance of a version permanently authorizes you +to choose that version for the Program. + + Later license versions may give you additional or different +permissions. However, no additional obligations are imposed on any +author or copyright holder as a result of your choosing to follow a +later version. + + 15. Disclaimer of Warranty. + + THERE IS NO WARRANTY FOR THE PROGRAM, TO THE EXTENT PERMITTED BY +APPLICABLE LAW. EXCEPT WHEN OTHERWISE STATED IN WRITING THE COPYRIGHT +HOLDERS AND/OR OTHER PARTIES PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY +OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, +THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +PURPOSE. THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE PROGRAM +IS WITH YOU. SHOULD THE PROGRAM PROVE DEFECTIVE, YOU ASSUME THE COST OF +ALL NECESSARY SERVICING, REPAIR OR CORRECTION. + + 16. Limitation of Liability. + + IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING +WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MODIFIES AND/OR CONVEYS +THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR DAMAGES, INCLUDING ANY +GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE +USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT LIMITED TO LOSS OF +DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD +PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER PROGRAMS), +EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF +SUCH DAMAGES. + + 17. Interpretation of Sections 15 and 16. + + If the disclaimer of warranty and limitation of liability provided +above cannot be given local legal effect according to their terms, +reviewing courts shall apply local law that most closely approximates +an absolute waiver of all civil liability in connection with the +Program, unless a warranty or assumption of liability accompanies a +copy of the Program in return for a fee. + + END OF TERMS AND CONDITIONS + + How to Apply These Terms to Your New Programs + + If you develop a new program, and you want it to be of the greatest +possible use to the public, the best way to achieve this is to make it +free software which everyone can redistribute and change under these terms. + + To do so, attach the following notices to the program. It is safest +to attach them to the start of each source file to most effectively +state the exclusion of warranty; and each file should have at least +the "copyright" line and a pointer to where the full notice is found. + + + Copyright (C) + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . + +Also add information on how to contact you by electronic and paper mail. + + If the program does terminal interaction, make it output a short +notice like this when it starts in an interactive mode: + + Copyright (C) + This program comes with ABSOLUTELY NO WARRANTY; for details type `show w'. + This is free software, and you are welcome to redistribute it + under certain conditions; type `show c' for details. + +The hypothetical commands `show w' and `show c' should show the appropriate +parts of the General Public License. Of course, your program's commands +might be different; for a GUI interface, you would use an "about box". + + You should also get your employer (if you work as a programmer) or school, +if any, to sign a "copyright disclaimer" for the program, if necessary. +For more information on this, and how to apply and follow the GNU GPL, see +. + + The GNU General Public License does not permit incorporating your program +into proprietary programs. If your program is a subroutine library, you +may consider it more useful to permit linking proprietary applications with +the library. If this is what you want to do, use the GNU Lesser General +Public License instead of this License. But first, please read +. diff --git a/SConstruct b/SConstruct new file mode 100644 index 0000000..acb9bda --- /dev/null +++ b/SConstruct @@ -0,0 +1,4 @@ +# -*- python -*- +from lsst.sconsUtils import scripts +# Python-only package +scripts.BasicSConstruct("ctrl_rucio_ingest", disableCc=True) diff --git a/bin/rucio-hermesk b/bin/rucio-hermesk new file mode 100755 index 0000000..8b9325d --- /dev/null +++ b/bin/rucio-hermesk @@ -0,0 +1,56 @@ +#!/usr/bin/env python3 +# srp +# -*- coding: utf-8 -*- +# Copyright European Organization for Nuclear Research (CERN) since 2012 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Hermesk is a daemon that gets the messages and sends them to external services (influxDB, ES, ActiveMQ, kafka). +""" + +import argparse +import signal + +from rucio.daemons.hermes.kafka.hermesk import run, stop + + +def get_parser(): + """ + Returns the argparse parser. + """ + parser = argparse.ArgumentParser(description="Hermesk is a daemon that get the messages and sends them to external services (influxDB, ES, ActiveMQ).", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=''' +Run the daemon:: + + $ rucio-hermesk --run-once + ''') + parser.add_argument("--run-once", action="store_true", default=False, help='One iteration only') + parser.add_argument("--threads", action="store", default=1, type=int, help='Concurrency control: number of threads') + parser.add_argument("--bulk", action="store", default=1000, type=int, help='Bulk control: number of requests per cycle') + parser.add_argument("--sleep-time", action="store", default=10, type=int, help='Delay control: second control per cycle') + return parser + + +if __name__ == "__main__": + + signal.signal(signal.SIGTERM, stop) + + parser = get_parser() + args = parser.parse_args() + try: + run(once=args.run_once, + threads=args.threads, + bulk=args.bulk, + sleep_time=args.sleep_time) + except KeyboardInterrupt: + stop() diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml new file mode 100644 index 0000000..3d536d8 --- /dev/null +++ b/docker/docker-compose.yml @@ -0,0 +1,13 @@ +version: "3.7" + +services: + rucio-daemons-hermesk: + container_name: rucio-daemons-hermesk + platform: linux/x86_64 + image: rucio-daemons-hermesk + build: + dockerfile: ./docker/rucio-daemons-hermesk/Dockerfile + context: ../ + args: + - RUCIO_DAEMONS_VERSION + - CONFLUENT_KAFKA_VERSION diff --git a/docker/rucio-daemons-hermesk/Dockerfile b/docker/rucio-daemons-hermesk/Dockerfile new file mode 100644 index 0000000..bc68ac0 --- /dev/null +++ b/docker/rucio-daemons-hermesk/Dockerfile @@ -0,0 +1,14 @@ +ARG RUCIO_DAEMONS_VERSION +FROM rucio/rucio-daemons:${RUCIO_DAEMONS_VERSION} + +ARG CONFLUENT_KAFKA_VERSION +RUN pip install confluent_kafka==${CONFLUENT_KAFKA_VERSION} +RUN pip install retrying + +RUN mkdir -p /etc/grid-security/certificates +RUN mkdir /usr/local/lib/python3.9/site-packages/rucio/daemons/hermes/kafka +COPY ./lib/rucio/daemons/hermes/kafka /usr/local/lib/python3.9/site-packages/rucio/daemons/hermes/kafka/ +COPY lib/rucio/common/constants.py /usr/local/lib/python3.9/site-packages/rucio/common/constants.py +COPY bin/rucio-hermesk /usr/local/bin/rucio-hermesk + +# ENTRYPOINT ["/start-daemon.sh"] diff --git a/docker/versions.env b/docker/versions.env new file mode 100644 index 0000000..aa2ca27 --- /dev/null +++ b/docker/versions.env @@ -0,0 +1,3 @@ +RUCIO_DAEMONS_HERMESK_VERSION=34.6.0.1 +RUCIO_DAEMONS_VERSION=release-34.6.0 +CONFLUENT_KAFKA_VERSION=1.9.2 diff --git a/etc/docker/dev/docker-compose-kafka.yml b/etc/docker/dev/docker-compose-kafka.yml new file mode 100644 index 0000000..65388a1 --- /dev/null +++ b/etc/docker/dev/docker-compose-kafka.yml @@ -0,0 +1,440 @@ +version: "3" +services: + rucioclient: + platform: linux/x86_64 + #image: docker.io/rucio/rucio-dev:latest-alma9 + image: docker.io/rucio/rucio-dev:release-33.5.0 + entrypoint: ["/rucio_source/etc/docker/dev/rucio_entrypoint.sh"] + command: ["sleep", "infinity"] + profiles: + - client + volumes: + - ../../certs/rucio_ca.pem:/etc/grid-security/certificates/5fca1cb1.0:z + - ../../certs/hostcert_rucio.pem:/etc/grid-security/hostcert.pem:z + - ../../certs/hostcert_rucio.key.pem:/etc/grid-security/hostkey.pem:z + - ../../certs/rucio_ca.pem:/opt/rucio/etc/rucio_ca.pem:z + - ../../certs/ruciouser.pem:/opt/rucio/etc/usercert.pem:z + - ../../certs/ruciouser.key.pem:/opt/rucio/etc/userkey.pem:z + - ../../certs/ruciouser.certkey.pem:/opt/rucio/etc/usercertkey.pem:z + - ../../certs/ssh/ruciouser_sshkey.pub:/root/.ssh/ruciouser_sshkey.pub:z + - ../../certs/ssh/ruciouser_sshkey:/root/.ssh/ruciouser_sshkey:z + - ../../../tools:/opt/rucio/tools:Z + - ../../../bin:/opt/rucio/bin:Z + - ../../../lib:/opt/rucio/lib:Z + - ../../../tests:/opt/rucio/tests:Z + - ../../../:/rucio_source:ro + #- ../../../../modfiles/httpd.conf:/etc/httpd/conf/httpd.conf:z + - ../../../../ctrl_rucio_ingest/etc/rucio.cfg:/opt/rucio/etc/rucio.cfg:z + - ../../../../ctrl_rucio_ingest/lib/rucio/common/schema/lsst.py:/opt/rucio/lib/rucio/common/schema/lsst.py:z + - ../../../../ctrl_rucio_ingest/lib/rucio/common/constants.py:/opt/rucio/lib/rucio/common/constants.py:z + - ../../../../ctrl_rucio_ingest/lib/rucio/rse/protocols/protocol.py:/opt/rucio/lib/rucio/rse/protocols/protocol.py:z + - ../../../../test_data:/opt/rucio/test_data + - ../../../../disks:/rucio/disks + environment: + - RUCIO_SOURCE_DIR=/rucio_source + - X509_USER_CERT=/opt/rucio/etc/usercert.pem + - X509_USER_KEY=/opt/rucio/etc/userkey.pem + - RDBMS + rucio: + #image: docker.io/rucio/rucio-dev:latest-alma9 + platform: linux/x86_64 + image: docker.io/rucio/rucio-dev:release-33.5.0 + # entrypoint: ["/rucio_source/etc/docker/dev/rucio_entrypoint.sh"] + command: ["httpd","-D","FOREGROUND"] + volumes: + - ../../certs/rucio_ca.pem:/etc/grid-security/certificates/5fca1cb1.0:z + - ../../certs/hostcert_rucio.pem:/etc/grid-security/hostcert.pem:z + - ../../certs/hostcert_rucio.key.pem:/etc/grid-security/hostkey.pem:z + - ../../certs/rucio_ca.pem:/opt/rucio/etc/rucio_ca.pem:z + - ../../certs/ruciouser.pem:/opt/rucio/etc/usercert.pem:z + - ../../certs/ruciouser.key.pem:/opt/rucio/etc/userkey.pem:z + - ../../certs/ruciouser.certkey.pem:/opt/rucio/etc/usercertkey.pem:z + - ../../certs/ssh/ruciouser_sshkey.pub:/root/.ssh/ruciouser_sshkey.pub:z + - ../../certs/ssh/ruciouser_sshkey:/root/.ssh/ruciouser_sshkey:z + - ../../../tools:/opt/rucio/tools:Z + - ../../../bin:/opt/rucio/bin:Z + - ../../../lib:/opt/rucio/lib:Z + - ../../../tests:/opt/rucio/tests:Z + - ../../../:/rucio_source:ro + #- ../../../../modfiles/httpd.conf:/etc/httpd/conf/httpd.conf:z + - ../../../../ctrl_rucio_ingest/etc/rucio.cfg:/opt/rucio/etc/rucio.cfg:z + - ../../../../ctrl_rucio_ingest/lib/rucio/common/schema/lsst.py:/opt/rucio/lib/rucio/common/schema/lsst.py:z + - ../../../../ctrl_rucio_ingest/lib/rucio/common/constants.py:/opt/rucio/lib/rucio/common/constants.py:z + - ../../../../ctrl_rucio_ingest/lib/rucio/rse/protocols/protocol.py:/opt/rucio/lib/rucio/rse/protocols/protocol.py:z + - ../../../../test_data:/opt/rucio/test_data + - ../../../../disks:/rucio/disks + environment: + - PYTHONPATH=${PYTHONPATH}:/opt/permissions/rubin + - RUCIO_SOURCE_DIR=/rucio_source + - X509_USER_CERT=/opt/rucio/etc/usercert.pem + - X509_USER_KEY=/opt/rucio/etc/userkey.pem + - RDBMS + ruciodb: + platform: linux/x86_64 + image: docker.io/postgres:14 + environment: + - POSTGRES_USER=rucio + - POSTGRES_DB=rucio + - POSTGRES_PASSWORD=secret + command: ["-c", "fsync=off","-c", "synchronous_commit=off","-c", "full_page_writes=off"] + graphite: + image: docker.io/graphiteapp/graphite-statsd + influxdb: + platform: linux/x86_64 + image: docker.io/influxdb:latest + environment: + - DOCKER_INFLUXDB_INIT_MODE=setup + - DOCKER_INFLUXDB_INIT_USERNAME=myusername + - DOCKER_INFLUXDB_INIT_PASSWORD=passwordpasswordpassword + - DOCKER_INFLUXDB_INIT_ORG=rucio + - DOCKER_INFLUXDB_INIT_BUCKET=rucio + - DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=mytoken + elasticsearch: + image: docker.io/elasticsearch:7.4.0 + environment: + - discovery.type=single-node + activemq: + image: docker.io/webcenter/activemq:latest + environment: + - ACTIVEMQ_CONFIG_NAME=activemq + - ACTIVEMQ_CONFIG_DEFAULTACCOUNT=false + - ACTIVEMQ_USERS_fts=supersecret + - ACTIVEMQ_GROUPS_writes=fts + - ACTIVEMQ_USERS_receiver=supersecret + - ACTIVEMQ_GROUPS_reads=receiver + - ACTIVEMQ_CONFIG_SCHEDULERENABLED=true + - ACTIVEMQ_USERS_hermes=supersecret + - ACTIVEMQ_CONFIG_QUEUES_events=/queue/events' + postgres14: + image: docker.io/postgres:14 + profiles: + - postgres14 + environment: + - POSTGRES_USER=rucio + - POSTGRES_DB=rucio + - POSTGRES_PASSWORD=rucio + command: ["-c", "fsync=off","-c", "synchronous_commit=off","-c", "full_page_writes=off"] + mysql8: + image: docker.io/mysql:8 + profiles: + - mysql8 + environment: + - MYSQL_USER=rucio + - MYSQL_PASSWORD=rucio + - MYSQL_ROOT_PASSWORD=rucio + - MYSQL_DATABASE=rucio + - MYSQL_TCP_PORT=3308 + command: + - "--default-authentication-plugin=mysql_native_password" + - "--character-set-server=latin1" + oracle: + image: docker.io/gvenzl/oracle-xe:18.4.0 + profiles: + - oracle + environment: + - ORACLE_PASSWORD=rucio + - ORACLE_ALLOW_REMOTE=true + - ORACLE_DISABLE_ASYNCH_IO=true + - processes=1000 + - sessions=1105 + - transactions=1215 + volumes: + - ./oracle_setup.sh:/container-entrypoint-initdb.d/oracle_setup.sh:Z + fts: + #image: docker.io/rucio/fts + platform: linux/x86_64 + image: docker.io/rucio/fts:33.5.0 + profiles: + - storage + volumes: + - ../../certs/rucio_ca.pem:/etc/grid-security/certificates/5fca1cb1.0:z + - ../../certs/hostcert_fts.pem:/etc/grid-security/hostcert.pem:Z + - ../../certs/hostcert_fts.key.pem:/etc/grid-security/hostkey.pem:Z + ulimits: + nofile: + soft: 10240 + hard: 10240 + ftsdb: + platform: linux/x86_64 + image: docker.io/mysql:8 + profiles: + - storage + command: --default-authentication-plugin=mysql_native_password + environment: + - MYSQL_USER=fts + - MYSQL_PASSWORD=fts + - MYSQL_ROOT_PASSWORD=fts + - MYSQL_DATABASE=fts + xrd1: + #image: docker.io/rucio/xrootd + platform: linux/x86_64 + image: docker.io/rucio/xrootd:33.5.0 + profiles: + - storage + environment: + - XRDPORT=1094 + volumes: + - ../../certs/rucio_ca.pem:/etc/grid-security/certificates/5fca1cb1.0:z + - ../../certs/hostcert_xrd1.pem:/tmp/xrdcert.pem:Z + - ../../certs/hostcert_xrd1.key.pem:/tmp/xrdkey.pem:Z + - ../../../../disks/xrd1/rucio:/rucio + ulimits: + nofile: + soft: 10240 + hard: 10240 + xrd2: + #image: docker.io/rucio/xrootd + platform: linux/x86_64 + image: docker.io/rucio/xrootd:33.5.0 + profiles: + - storage + environment: + - XRDPORT=1095 + volumes: + - ../../certs/rucio_ca.pem:/etc/grid-security/certificates/5fca1cb1.0:z + - ../../certs/hostcert_xrd2.pem:/tmp/xrdcert.pem:Z + - ../../certs/hostcert_xrd2.key.pem:/tmp/xrdkey.pem:Z + - ../../../../disks/xrd2/rucio:/rucio + ulimits: + nofile: + soft: 10240 + hard: 10240 + xrd3: + #image: docker.io/rucio/xrootd + platform: linux/x86_64 + image: docker.io/rucio/xrootd:33.5.0 + profiles: + - storage + environment: + - XRDPORT=1096 + volumes: + - ../../certs/rucio_ca.pem:/etc/grid-security/certificates/5fca1cb1.0:z + - ../../certs/hostcert_xrd3.pem:/tmp/xrdcert.pem:Z + - ../../certs/hostcert_xrd3.key.pem:/tmp/xrdkey.pem:Z + - ../../../../disks/xrd3/rucio:/rucio + ulimits: + nofile: + soft: 10240 + hard: 10240 + xrd4: + #image: docker.io/rucio/xrootd + platform: linux/x86_64 + image: docker.io/rucio/xrootd:33.5.0 + profiles: + - storage + environment: + - XRDPORT=1097 + volumes: + - ../../certs/rucio_ca.pem:/etc/grid-security/certificates/5fca1cb1.0:z + - ../../certs/hostcert_xrd4.pem:/tmp/xrdcert.pem:Z + - ../../certs/hostcert_xrd4.key.pem:/tmp/xrdkey.pem:Z + - ../../../../disks/xrd4/rucio:/rucio + ulimits: + nofile: + soft: 10240 + hard: 10240 + minio: + image: docker.io/minio/minio + profiles: + - storage + environment: + - MINIO_ROOT_USER=admin + - MINIO_ROOT_PASSWORD=password + volumes: + - ../../certs/hostcert_minio.pem:/root/.minio/certs/public.crt:Z + - ../../certs/hostcert_minio.key.pem:/root/.minio/certs/private.key:Z + command: ["server", "/data"] + ssh1: + image: docker.io/rucio/ssh + profiles: + - storage + volumes: + - ../../certs/ssh/ruciouser_sshkey.pub:/tmp/sshkey.pub:Z + mongo: + image: docker.io/mongo:5.0 + profiles: + - externalmetadata + postgres: + image: docker.io/postgres:14 + profiles: + - externalmetadata + environment: + - POSTGRES_USER=rucio + - POSTGRES_DB=metadata + - POSTGRES_PASSWORD=secret + command: ["-p", "5433"] + logstash: + image: docker.elastic.co/logstash/logstash-oss:7.3.2 + profiles: + - monitoring + command: bash -c "logstash-plugin install logstash-input-stomp ; /usr/local/bin/docker-entrypoint" + volumes: + - ./pipeline.conf:/usr/share/logstash/pipeline/pipeline.conf:Z + kibana: + image: docker.io/kibana:7.4.0 + profiles: + - monitoring + grafana: + image: docker.io/grafana/grafana:latest + profiles: + - monitoring + db-iam: + image: mariadb:10.11 + profiles: + - iam + environment: + - TZ=Europe/Paris + - MYSQL_ROOT_PASSWORD=supersecret + - MYSQL_USER=iam + - MYSQL_PASSWORD=secret + - MYSQL_DATABASE=iam_db + nginx-iam: + image: nginx + profiles: + - iam + dns_search: cern.ch + environment: + TZ: Europe/Paris + NGINX_HOST: iam + NGINX_PORT: 443 + volumes: + - ../../certs/rucio_ca.pem:/etc/grid-security/certificates/5fca1cb1.0:z + # - ../../certs/hostcert_rucio.pem:/etc/grid-security/hostcert.pem:z + # - ../../certs/hostcert_rucio.key.pem:/etc/grid-security/hostkey.pem:z + - /etc/grid-security/:/etc/grid-security/ + - /dev/urandom:/dev/random + - ../../iam-assets/iam.conf:/etc/nginx/conf.d/default.conf:ro + iam: + profiles: + - iam + image: indigoiam/iam-login-service:v1.8.2 + volumes: + - ../../iam-assets/keystore.jwks:/keystore.jwks:ro + environment: + - IAM_JAVA_OPTS=-Djava.security.egd=file:/dev/urandom -Dspring.profiles.active=prod,oidc,cern,registration,wlcg-scopes -agentlib:jdwp=transport=dt_socket,server=y,address=1044,suspend=n -Dlogging.file.name=/var/log/iam/iam.log + - IAM_HOST= + - IAM_PORT=8090 + - IAM_BASE_URL=https:// + - IAM_ISSUER=https:// + - IAM_FORWARD_HEADERS_STRATEGY=native + - IAM_KEY_STORE_LOCATION=file:/keystore.jwks + - IAM_JWK_CACHE_LIFETIME=21600 + # - IAM_X509_TRUST_ANCHORS_DIR=/etc/grid-security/certificates + # - IAM_X509_TRUST_ANCHORS_REFRESH=14400 + - IAM_TOMCAT_ACCESS_LOG_ENABLED=false + - IAM_TOMCAT_ACCESS_LOG_DIRECTORY=/tmp + - IAM_ACTUATOR_USER_USERNAME=user + - IAM_ACTUATOR_USER_PASSWORD=secret + - IAM_LOCAL_RESOURCES_ENABLE=true + - IAM_LOCAL_RESOURCES_LOCATION=file:/indigo-iam/local-resources + - IAM_ORGANISATION_NAME=rucio-dc + - IAM_TOPBAR_TITLE="INDIGO IAM for rucio-dc" + - IAM_DB_HOST= + - IAM_DB_PORT=3307 + - IAM_DB_NAME=iam_db + - IAM_DB_USERNAME=iam + - IAM_DB_PASSWORD=secret + zookeeper: + image: confluentinc/cp-zookeeper:latest + platform: linux/x86_64 + profiles: + - kafka + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + #ports: + # - 22181:2181 + kafka: + image: confluentinc/cp-server:latest + platform: linux/x86_64 + #hostname: kafka + container_name: kafka + #network_mode: "service:rucio" + profiles: + - kafka + depends_on: + - zookeeper + # ports: + # - "9092:9092" + # - "9101:9101" + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 + KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_JMX_PORT: 9101 + KAFKA_JMX_HOSTNAME: localhost + KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081 + CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka:29092 + CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1 + CONFLUENT_METRICS_ENABLE: 'true' + CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous' + volumes: + - ../../../../test_data:/opt/rucio/test_data + testenv-ingestd: + image: testenv-ingestd:1.1 + profiles: + - kafka + environment: + CTRL_INGESTD_CONFIG: /home/lsst/ctrl_ingestd/etc/ingestd.yml + #network_mode: "service:rucio" + volumes: + - /tmp/ingestd:/tmp/ + - ../../../../disks:/rucio/disks + - ../../../../ctrl_ingestd:/home/lsst/ctrl_ingestd + - ../../../../tmp_new:/home/lsst/tmp_new + - ../../../../test_data:/home/lsst/test_data + command: ["tail", "-f", "/dev/null"] + test-register: + container_name: test-register + image: test-register + volumes: + - /tmp/register:/tmp/ + - ../../../../disks:/rucio/disks + - ../../../../dm_rucio_register:/home/lsst/dm_rucio_register + - ../../../../ctrl_ingestd:/home/lsst/ctrl_ingestd + - ../../../../embargo-butler:/home/lsst/embargo-butler + - ../../../../tmp_new:/home/lsst/tmp_new + - ../../../../test_data:/home/lsst/test_data + - ../../../../rucio_sup:/home/lsst/rucio_sup + - ../../ctrl_rucio_ingest/etc/rucio.cfg:/opt/rucio/etc/rucio.cfg:z + - ../../certs/rucio_ca.pem:/etc/grid-security/certificates/5fca1cb1.0:z + - ../../certs/hostcert_rucio.pem:/etc/grid-security/hostcert.pem:z + - ../../certs/hostcert_rucio.key.pem:/etc/grid-security/hostkey.pem:z + - ../../certs/rucio_ca.pem:/opt/rucio/etc/rucio_ca.pem:z + - ../../certs/ruciouser.pem:/opt/rucio/etc/usercert.pem:z + - ../../certs/ruciouser.key.pem:/opt/rucio/etc/userkey.pem:z + - ../../certs/ruciouser.certkey.pem:/opt/rucio/etc/usercertkey.pem:z + - ../../certs/ssh/ruciouser_sshkey.pub:/root/.ssh/ruciouser_sshkey.pub:z + - ../../certs/ssh/ruciouser_sshkey:/root/.ssh/ruciouser_sshkey:z + - ../../../../ctrl_rucio_ingest/etc/rucio.cfg:/opt/rucio/etc/rucio.cfg:z + - ../../../../../.exrc:/opt/rucio/.exrc:z + kafdrop: + image: obsidiandynamics/kafdrop:latest + platform: linux/x86_64 + profiles: + - kafka + # network_mode: "service:rucio" + environment: + KAFKA_BROKER_CONNECT: kafka:9092 + SERVER_PORT: 9550 + MANAGEMENT_SERVER_PORT: 9550 + hermesk: + image: rucio-daemons-hermesk:33.5.0.1 + profiles: + - kafka + environment: + - RUCIO_DAEMON=hermesk + volumes: + - ../../../../ctrl_rucio_ingest/etc/rucio.cfg:/opt/rucio/etc/rucio.cfg:Z + - ../../../../rucio/etc/certs/rucio_ca.pem:/etc/grid-security/certificates/5fca1cb1.0:Z + - ../../../:/rucio_source:ro + #entrypoint: ["/rucio_source/etc/docker/dev/rucio_entrypoint.sh"] + #command: ["sleep", "infinity"] diff --git a/etc/rucio.cfg b/etc/rucio.cfg new file mode 100644 index 0000000..b216634 --- /dev/null +++ b/etc/rucio.cfg @@ -0,0 +1,201 @@ +[common] +logdir = /var/log/rucio +loglevel = DEBUG +mailtemplatedir=/opt/rucio/etc/mail_templates + +[client] +rucio_host = https://rucio:443 +auth_host = https://rucio:443 +auth_type = userpass +username = ddmlab +password = secret +ca_cert = /etc/grid-security/certificates/5fca1cb1.0 +client_cert = /opt/rucio/etc/usercert.pem +client_key = /opt/rucio/etc/userkey.pem +client_x509_proxy = $X509_USER_PROXY +account = root +request_retries = 3 + +[database] +default = postgresql://rucio:secret@ruciodb/rucio +schema = dev +echo=0 +pool_recycle=3600 +pool_size=20 +max_overflow=20 +pool_reset_on_return=rollback + +[bootstrap] +userpass_identity = ddmlab +userpass_pwd = secret +userpass_email = rucio-dev@cern.ch + +# Default development client certificate from /opt/rucio/etc/usercert.pem +x509_identity = /CN=Rucio User +x509_email = rucio-dev@cern.ch + +# Default Kerberos account +gss_identity = rucio-dev@CERN.CH +gss_email = rucio-dev@cern.ch + +[monitor] +carbon_server = graphite +carbon_port = 8125 +user_scope = docker + +[conveyor] +scheme = https,davs,gsiftp,root,srm,mock +#scheme = https +#user_transfers = cms +#user_activities = ['dummy_user_activity'] +#hostcert = /etc/grid-security/hostcert.pem +#hostkey = /etc/grid-security/hostkey.pem +transfertool = fts3 +ftshosts = https://fts:8446 +cacert = /etc/grid-security/certificates/5fca1cb1.0 +usercert = /opt/rucio/etc/usercertkey.pem + +[messaging-fts3] +port = 61613 +nonssl_port = 61613 +use_ssl = False +ssl_key_file = /opt/rucio/etc/userkey.pem +ssl_cert_file = /opt/rucio/etc/usercert.pem +destination = /topic/transfer.fts_monitoring_complete +username = receiver +password = supersecret +brokers = activemq +voname = atlas + +[messaging-hermes] +username = hermes +password = supersecret +port = 61613 +nonssl_port = 61613 +use_ssl = False +destination = /queue/events +ssl_key_file = /opt/rucio/etc/userkey.pem +ssl_cert_file = /opt/rucio/etc/usercert.pem +brokers = activemq +voname = atlas +email_from = Rucio +email_test = spamspamspam@cern.ch + +[transmogrifier] +maxdids = 100000 + +[accounts] +# These are accounts that can write into scopes owned by another account +special_accounts = panda, tier0 + +[trace] +tracedir = /var/log/rucio/trace +brokers=activemq +port=61013 +username = username +password = password +topic = /topic/rucio.tracer + +[nongrid-trace] +tracedir = /var/log/rucio/trace +brokers=activemq +port=61013 +username = username +password = password +topic = /topic/rucio.tracer + +[tracer-kronos] +brokers=activemq +port=61013 +ssl_key_file = /opt/rucio/etc/userkey.pem +ssl_cert_file = /opt/rucio/etc/usercert.pem +queue = /queue/Consumer.kronos.rucio.tracer +prefetch_size = 10 +chunksize = 10 +subscription_id = rucio-tracer-listener +use_ssl = False +reconnect_attempts = 100 +excluded_usrdns = /DC=ch/DC=cern/OU=Organic Units/OU=Users/CN=gangarbt/CN=722147/CN=Robot: Ganga Robot/CN=proxy +username = username +password = password +dataset_wait = 60 + +[injector] +file = /opt/rucio/tools/test.file.1000 +bytes = 1000 +md5 = fd21ce524a9e45060fd3f62c4ef6a386 +adler32 = 52590737 + +[alembic] +cfg = /opt/rucio/etc/alembic.ini + +[messaging-cache] +port = 61023 +ssl_key_file = /opt/rucio/etc/userkey.pem +ssl_cert_file = /opt/rucio/etc/usercert.pem +destination = /topic/rucio.cache +brokers = activemq +voname = atlas +account = cache_mb + +[test] +cacert = /etc/grid-security/certificates/5fca1cb1.0 +usercert = /opt/rucio/etc/usercert.pem +userkey = /opt/rucio/etc/userkey.pem + +[nagios] +proxy = /opt/rucio/etc/usercertkey.pem +rfcproxy = /opt/rucio/etc/usercertkey.pem +fts_servers = https://fts3:8446 + +[auditor] +cache = /opt/rucio/auditor-cache +results = /opt/rucio/auditor-results + +[hermes] +email_from = Rucio +email_test = spamspamspam@cern.ch +services_list = kafka + +[c3po] +placement_algorithm = t2_free_space +elastic_url = http://elastic:9200 +redis_host = rucio +redis_port = 6379 + +[c3po-popularity] +elastic_url = http://elastic:9200 + +[c3po-site-mapper] +panda_url = http://agis:80/request/pandaqueue/query/list/?json +ddm_url = http://agis:80/request/ddmendpoint/query/list/?json + +[c3po-workload] +panda_url = http://bigpanda:80/jobs/?category=analysis&jobstatus=running +window = 604800 + +[policy] +package = rubin +#permission = atlas +#schema = atlas +##schema = lsst +schema = lsst +#lfn2pfn_algorithm_default = hash +lfn2pfn_algorithm_default = identity +##lfn2pfn_algorithm_default = rubinbutler +#lfn2pfn_algorithm_default = lsst_butler +#support = rucio-dev@cern.ch +#support_rucio = https://github.com/rucio/rucio/issues/ + +[credentials] +gcs = /opt/rucio/etc/google-cloud-storage-test.json + +[api] +endpoints = accountlimits, accounts, archives, auth, config, credentials, dids, dirac, export, heartbeats, identities, import, lifetime_exceptions, locks, meta, ping, redirect, replicas, requests, rses, rules, scopes, subscriptions, traces, vos + +[messaging-hermes-kafka] +nonssl_port = 9092 +use_ssl = False +brokers = kafka +message_filter = rucio.daemons.hermes.kafka.filters.filtered_list +topic_list = XRD1,XRD2,XRD3 diff --git a/lib/rucio/common/.github/workflows/build.yaml b/lib/rucio/common/.github/workflows/build.yaml new file mode 100644 index 0000000..40c3bcb --- /dev/null +++ b/lib/rucio/common/.github/workflows/build.yaml @@ -0,0 +1,43 @@ +name: Build Rucio Hermes Kafka container +on: + push: + tags: + - v* + pull_request: + +env: + INGESTD_NAME: rucio-daemons-hermesk + +jobs: + push: + runs-on: ubuntu-latest + permissions: + packages: write + contents: read + steps: + - name: Checkout + uses: actions/checkout@v3 + + - name: Build ingest image + working-directory: docker + run: | + docker-compose -f "docker-compose.yml" --env-file versions.env build rucio-daemons-hermesk + + - name: Log in to GitHub Container Registry + run: echo "${{ secrets.GITHUB_TOKEN }}" | docker login ghcr.io -u $ --password-stdin + + - name: Push image + run: | + INGESTD_ID=ghcr.io/${{ github.repository_owner }}/$INGESTD_NAME + + # Strip git ref prefix from version + VERSION=$(echo "${{ github.ref }}" | sed -e 's,.*/\(.*\),\1,') + # Strip "v" prefix from tag name + [[ "${{ github.ref }}" == "refs/tags/"* ]] && VERSION=$(echo $VERSION | sed -e 's/^v//') + # Use Docker `latest` tag convention + [ "$VERSION" == "main" ] && VERSION=latest + echo INGESTD_ID=$INGESTD_ID + echo VERSION=$VERSION + docker tag $INGESTD_NAME $INGESTD_ID:$VERSION + docker push $INGESTD_ID:$VERSION + diff --git a/lib/rucio/common/constants.py b/lib/rucio/common/constants.py new file mode 100644 index 0000000..04d403f --- /dev/null +++ b/lib/rucio/common/constants.py @@ -0,0 +1,157 @@ +# Copyright European Organization for Nuclear Research (CERN) since 2012 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import enum +from collections import namedtuple +from typing import Literal, get_args + +from rucio.common.config import config_get_bool + +""" +Constants. + +""" + +RESERVED_KEYS = ['scope', 'name', 'account', 'did_type', 'is_open', 'monotonic', 'obsolete', 'complete', + 'availability', 'suppressed', 'bytes', 'length', 'md5', 'adler32', 'rule_evaluation_action', + 'rule_evaluation_required', 'expired_at', 'deleted_at', 'created_at', 'updated_at'] +# collection_keys = +# file_keys = + +KEY_TYPES = ['ALL', 'COLLECTION', 'FILE', 'DERIVED'] +# all(container, dataset, file), collection(dataset or container), file, derived(compute from file for collection) + +SCHEME_MAP = {'srm': ['srm', 'gsiftp'], + 'gsiftp': ['srm', 'gsiftp'], + 'https': ['https', 'davs', 'srm+https', 'cs3s'], + 'davs': ['https', 'davs', 'srm+https', 'cs3s'], + 'srm+https': ['https', 'davs', 'srm+https', 'cs3s'], + 'cs3s': ['https', 'davs', 'srm+https', 'cs3s'], + 'root': ['root'], + 'scp': ['scp'], + 'rsync': ['rsync'], + 'rclone': ['rclone']} +if config_get_bool('transfers', 'srm_https_compatibility', raise_exception=False, default=False): + SCHEME_MAP['srm'].append('https') + SCHEME_MAP['https'].append('srm') + SCHEME_MAP['srm'].append('davs') + SCHEME_MAP['davs'].append('srm') + +SUPPORTED_PROTOCOLS_LITERAL = Literal['gsiftp', 'srm', 'root', 'davs', 'http', 'https', 'file', 'storm', 'srm+https', 'scp', 'rsync', 'rclone', 'magnet'] +SUPPORTED_PROTOCOLS: list[str] = list(get_args(SUPPORTED_PROTOCOLS_LITERAL)) + +RSE_SUPPORTED_PROTOCOL_DOMAINS_LITERAL = Literal['ALL', 'LAN', 'WAN'] + +RSE_SUPPORTED_PROTOCOL_OPERATIONS_LITERAL = Literal['read', 'write', 'delete', 'third_party_copy_read', 'third_party_copy_write'] +RSE_SUPPORTED_PROTOCOL_OPERATIONS: list[str] = list(get_args(RSE_SUPPORTED_PROTOCOL_OPERATIONS_LITERAL)) + +FTS_STATE = namedtuple('FTS_STATE', ['SUBMITTED', 'READY', 'ACTIVE', 'FAILED', 'FINISHED', 'FINISHEDDIRTY', 'NOT_USED', + 'CANCELED'])('SUBMITTED', 'READY', 'ACTIVE', 'FAILED', 'FINISHED', 'FINISHEDDIRTY', + 'NOT_USED', 'CANCELED') + +FTS_COMPLETE_STATE = namedtuple('FTS_COMPLETE_STATE', ['OK', 'ERROR'])('Ok', 'Error') + +# https://gitlab.cern.ch/fts/fts3/-/blob/master/src/db/generic/Job.h#L41 +FTS_JOB_TYPE = namedtuple('FTS_JOB_TYPE', ['MULTIPLE_REPLICA', 'MULTI_HOP', 'SESSION_REUSE', 'REGULAR'])('R', 'H', 'Y', 'N') + + +# Messages constants + +MAX_MESSAGE_LENGTH = 4000 + + +class SuspiciousAvailability(enum.Enum): + ALL = 0 + EXIST_COPIES = 1 + LAST_COPY = 2 + + +class ReplicaState(enum.Enum): + # From rucio.db.sqla.constants, update that file at the same time as this + AVAILABLE = 'A' + UNAVAILABLE = 'U' + COPYING = 'C' + BEING_DELETED = 'B' + BAD = 'D' + TEMPORARY_UNAVAILABLE = 'T' + + +@enum.unique +class HermesService(str, enum.Enum): + """ + The services supported by Hermes2. + """ + INFLUX = "INFLUX" + ELASTIC = "ELASTIC" + EMAIL = "EMAIL" + ACTIVEMQ = "ACTIVEMQ" + KAFKA = "KAFKA" + + +class RseAttr: + + """ + List of functional RSE attributes. + + This class acts as a namespace containing all RSE attributes referenced in + the Rucio source code. Setting them affects Rucio's behaviour in some way. + """ + + ARCHIVE_TIMEOUT = 'archive_timeout' + ASSOCIATED_SITES = 'associated_sites' + AUTO_APPROVE_BYTES = 'auto_approve_bytes' + AUTO_APPROVE_FILES = 'auto_approve_files' + BITTORRENT_TRACKER_ADDR = 'bittorrent_tracker_addr' + BLOCK_MANUAL_APPROVAL = 'block_manual_approval' + COUNTRY = 'country' + DECOMMISSION = 'decommission' + DEFAULT_ACCOUNT_LIMIT_BYTES = 'default_account_limit_bytes' + FTS = 'fts' + GLOBUS_ENDPOINT_ID = 'globus_endpoint_id' + GREEDYDELETION = 'greedyDeletion' + IS_OBJECT_STORE = 'is_object_store' + LFN2PFN_ALGORITHM = 'lfn2pfn_algorithm' + MAXIMUM_PIN_LIFETIME = 'maximum_pin_lifetime' + MULTIHOP_TOMBSTONE_DELAY = 'multihop_tombstone_delay' + NAMING_CONVENTION = 'naming_convention' + OIDC_BASE_PATH = 'oidc_base_path' + OIDC_SUPPORT = 'oidc_support' + PHYSGROUP = 'physgroup' + QBITTORRENT_MANAGEMENT_ADDRESS = 'qbittorrent_management_address' + RESTRICTED_READ = 'restricted_read' + RESTRICTED_WRITE = 'restricted_write' + RULE_APPROVERS = 'rule_approvers' + S3_URL_STYLE = 's3_url_style' + SIGN_URL = 'sign_url' + SIMULATE_MULTIRANGE = 'simulate_multirange' + SITE = 'site' + SKIP_UPLOAD_STAT = 'skip_upload_stat' + SOURCE_FOR_TOTAL_SPACE = 'source_for_total_space' + SOURCE_FOR_USED_SPACE = 'source_for_used_space' + STAGING_BUFFER = 'staging_buffer' + STAGING_REQUIRED = 'staging_required' + STRICT_COPY = 'strict_copy' + TOMBSTONE_DELAY = 'tombstone_delay' + TYPE = 'type' + USE_IPV4 = 'use_ipv4' + VERIFY_CHECKSUM = 'verify_checksum' + + # The following RSE attributes are exclusively used in the permission layer + # and are likely VO-specific. + + BLOCK_MANUAL_APPROVE = 'block_manual_approve' + CMS_TYPE = 'cms_type' + DEFAULT_LIMIT_FILES = 'default_limit_files' + QUOTA_APPROVERS = 'quota_approvers' + RULE_DELETERS = 'rule_deleters' diff --git a/lib/rucio/common/schema/lsst.py b/lib/rucio/common/schema/lsst.py new file mode 100644 index 0000000..d2f866b --- /dev/null +++ b/lib/rucio/common/schema/lsst.py @@ -0,0 +1,431 @@ +# -*- coding: utf-8 -*- +# Copyright European Organization for Nuclear Research (CERN) since 2012 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from jsonschema import validate, ValidationError + +from rucio.common.exception import InvalidObject + +ACCOUNT_LENGTH = 25 + +ACCOUNT = {"description": "Account name", + "type": "string", + "pattern": "^[a-z0-9-_]{1,%s}$" % ACCOUNT_LENGTH} + +ACCOUNTS = {"description": "Array of accounts", + "type": "array", + "items": ACCOUNT, + "minItems": 0, + "maxItems": 1000} + + +ACCOUNT_TYPE = {"description": "Account type", + "type": "string", + "enum": ["USER", "GROUP", "SERVICE"]} + +ACTIVITY = {"description": "Activity name", + "type": "string", + "enum": ["Data Brokering", "Data Consolidation", "Data Rebalancing", + "Debug", "Express", "Functional Test", "Group Subscriptions", + "Production Input", "Production Output", + "Analysis Input", "Analysis Output", "Staging", + "T0 Export", "T0 Tape", "Upload/Download (Job)", + "Upload/Download (User)", "User Subscriptions", "Data Challenge"]} + +SCOPE_LENGTH = 25 + +SCOPE = {"description": "Scope name", + "type": "string", + "pattern": "^[a-zA-Z\.\-\\0-9/]{1,%s}$" % SCOPE_LENGTH} + +R_SCOPE = {"description": "Scope name", + "type": "string", + "pattern": "\\w"} + +NAME_LENGTH = 500 + +NAME = {"description": "Data Identifier name", + "type": "string", + "pattern": r"^[\w\.\-\+/]{1,%s}$" % NAME_LENGTH} + +R_NAME = {"description": "Data Identifier name", + "type": "string", + "pattern": "\\w"} + +LOCKED = {"description": "Rule locked status", + "type": ["boolean", "null"]} + +ASK_APPROVAL = {"description": "Rule approval request", + "type": ["boolean", "null"]} + +ASYNCHRONOUS = {"description": "Asynchronous rule creation", + "type": ["boolean", "null"]} + +DELAY_INJECTION = {"description": "Time (in seconds) to wait before starting applying the rule. Implies asynchronous rule creation.", + "type": ["integer", "null"]} + +PURGE_REPLICAS = {"description": "Rule purge replica status", + "type": "boolean"} + +IGNORE_AVAILABILITY = {"description": "Rule ignore availability status", + "type": "boolean"} + +RSE = {"description": "RSE name", + "type": "string", + "pattern": "^([A-Z0-9]+([_-][A-Z0-9]+)*)$"} + +RSE_ATTRIBUTE = {"description": "RSE attribute", + "type": "string", + "pattern": r'([A-Za-z0-9\._-]+[=<>][A-Za-z0-9_-]+)'} + +DEFAULT_RSE_ATTRIBUTE = {"description": "Default RSE attribute", + "type": "string", + "pattern": r'([A-Z0-9]+([_-][A-Z0-9]+)*)'} + +REPLICA_STATE = {"description": "Replica state", + "type": "string", + "enum": ["AVAILABLE", "UNAVAILABLE", "COPYING", "BEING_DELETED", "BAD", "SOURCE", "A", "U", "C", "B", "D", "S"]} + +DATE = {"description": "Date", + "type": "string", + "pattern": r'((Mon)|(Tue)|(Wed)|(Thu)|(Fri)|(Sat)|(Sun))[,]\s\d{2}\s(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\s\d{4}\s(0\d|1\d|2[0-3])(\:)(0\d|1\d|2\d|3\d|4\d|5\d)(\:)(0\d|1\d|2\d|3\d|4\d|5\d)\s(UTC)'} + +DID_TYPE = {"description": "DID type", + "type": "string", + "enum": ["DATASET", "CONTAINER", "FILE", "F"]} + +GROUPING = {"description": "Rule grouping", + "type": ["string", "null"], + "enum": ["DATASET", "NONE", "ALL", None]} + +NOTIFY = {"description": "Rule notification setting", + "type": ["string", "null"], + "enum": ["Y", "C", "N", "P", None]} + +COMMENT = {"description": "Rule comment", + "type": ["string", "null"], + "maxLength": 250} + +METADATA = {"description": "Rule wfms metadata", + "type": ["string", "null"], + "maxLength": 3999} + +BYTES = {"description": "Size in bytes", + "type": "integer"} + +ADLER32 = {"description": "adler32", + "type": "string", + "pattern": "^[a-fA-F\\d]{8}$"} + +WEIGHT = {"description": "Rule weight", + "type": ["string", "null"]} + +MD5 = {"description": "md5", + "type": "string", + "pattern": "^[a-fA-F\\d]{32}$"} + +UUID = {"description": "Universally Unique Identifier (UUID)", + "type": "string", + "pattern": '^(\\{){0,1}[0-9a-fA-F]{8}-?[0-9a-fA-F]{4}-?[0-9a-fA-F]{4}-?[0-9a-fA-F]{4}-?[0-9a-fA-F]{12}(\\}){0,1}$'} + +META = {"description": "Data Identifier(DID) metadata", + "type": "object", + "properties": {"guid": UUID}, + "additionalProperties": True} + +PFN = {"description": "Physical File Name", "type": "string"} + +COPIES = {"description": "Number of replica copies", "type": "integer"} + +RSE_EXPRESSION = {"description": "RSE expression", "type": "string"} + +SOURCE_REPLICA_EXPRESSION = {"description": "RSE expression", "type": ["string", "null"]} + +LIFETIME = {"description": "Lifetime", "type": "number"} + +RULE_LIFETIME = {"description": "Rule lifetime", "type": ["number", "null"]} + +SUBSCRIPTION_ID = {"description": "Rule Subscription id", "type": ["string", "null"]} + +PRIORITY = {"description": "Priority of the transfers", + "type": "integer"} + +SPLIT_CONTAINER = {"description": "Rule split container mode", + "type": ["boolean", "null"]} + +TIME_ENTRY = { + "description": "Datetime, ISO 8601", + "type": "string", + "pattern": r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d*$' +} + +IP = { + "description": "Internet Protocol address v4, RFC 791", + "type": "string", + "pattern": r'^((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(\.|$)){4}$' +} + +IPv4orIPv6 = { + "description": "IPv4 or IPv6 address", + "type": "string", + "format": "ipv4_or_ipv6" +} + +CLIENT_STATE = { + "description": "Client state", + "type": "string", + "enum": ['DONE', 'FAILED', 'PROCESSING', 'ALREADY_DONE', 'FILE_NOT_FOUND', 'FOUND_IN_PCACHE', 'DOWNLOAD_ATTEMPT', + 'FAIL_VALIDATE', 'FOUND_ROOT', 'ServiceUnavailable', 'SERVICE_ERROR', 'CP_TIMEOUT', 'COPY_ERROR', + 'STAGEIN_ATTEMPT_FAILED', 'SourceNotFound', 'MISSINGOUTPUTFILE', 'MD_MISMATCH', 'CHECKSUMCALCULATIONFAILURE', + 'MISSINGINPUT', 'MISSING_INPUT'] +} + +RULE = {"description": "Replication rule", + "type": "object", + "properties": {"dids": {"type": "array"}, + "account": ACCOUNT, + "copies": COPIES, + "rse_expression": RSE_EXPRESSION, + "grouping": GROUPING, + "weight": WEIGHT, + "lifetime": RULE_LIFETIME, + "locked": LOCKED, + "subscription_id": SUBSCRIPTION_ID, + "source_replica_expression": SOURCE_REPLICA_EXPRESSION, + "activity": ACTIVITY, + "notify": NOTIFY, + "purge_replicas": PURGE_REPLICAS, + "ignore_availability": IGNORE_AVAILABILITY, + "comment": COMMENT, + "ask_approval": ASK_APPROVAL, + "asynchronous": ASYNCHRONOUS, + "delay_injection": DELAY_INJECTION, + "priority": PRIORITY, + 'split_container': SPLIT_CONTAINER, + 'meta': METADATA}, + "required": ["dids", "copies", "rse_expression"], + "additionalProperties": False} + +RULES = {"description": "Array of replication rules", + "type": "array", + "items": RULE, + "minItems": 1, + "maxItems": 1000} + +COLLECTION_TYPE = {"description": "Dataset or container type", + "type": "string", + "enum": ["DATASET", "CONTAINER"]} + +COLLECTION = {"description": "Dataset or container", + "type": "object", + "properties": {"scope": SCOPE, + "name": NAME, + "type": COLLECTION_TYPE, + "meta": META, + "rules": RULES}, + "required": ["scope", "name", "type"], + "additionalProperties": False} + +COLLECTIONS = {"description": "Array of datasets or containers", + "type": "array", + "items": COLLECTION, + "minItems": 1, + "maxItems": 1000} + +DID = {"description": "Data Identifier(DID)", + "type": "object", + "properties": {"scope": SCOPE, + "name": NAME, + "type": DID_TYPE, + "meta": META, + "rules": RULES, + "bytes": BYTES, + "adler32": ADLER32, + "md5": MD5, + "state": REPLICA_STATE, + "pfn": PFN}, + "required": ["scope", "name"], + "additionalProperties": False} + +DID_FILTERS = {"description": "Array to filter DIDs by metadata", + "type": "array", + "additionalProperties": True} + +R_DID = {"description": "Data Identifier(DID)", + "type": "object", + "properties": {"scope": R_SCOPE, + "name": R_NAME, + "type": DID_TYPE, + "meta": META, + "rules": RULES, + "bytes": BYTES, + "adler32": ADLER32, + "md5": MD5, + "state": REPLICA_STATE, + "pfn": PFN}, + "required": ["scope", "name"], + "additionalProperties": False} + +DIDS = {"description": "Array of Data Identifiers(DIDs)", + "type": "array", + "items": DID, + "minItems": 1, + "maxItems": 1000} + +R_DIDS = {"description": "Array of Data Identifiers(DIDs)", + "type": "array", + "items": R_DID, + "minItems": 1, + "maxItems": 1000} + +ATTACHMENT = {"description": "Attachement", + "type": "object", + "properties": {"scope": SCOPE, + "name": NAME, + "rse": {"description": "RSE name", + "type": ["string", "null"], + "pattern": "^([A-Z0-9]+([_-][A-Z0-9]+)*)$"}, + "dids": DIDS}, + "required": ["dids"], + "additionalProperties": False} + +ATTACHMENTS = {"description": "Array of attachments", + "type": "array", + "items": ATTACHMENT, + "minItems": 1, + "maxItems": 1000} + +SUBSCRIPTION_FILTER = {"type": "object", + "properties": {"datatype": {"type": "array"}, + "prod_step": {"type": "array"}, + "stream_name": {"type": "array"}, + "project": {"type": "array"}, + "scope": {"type": "array"}, + "pattern": {"type": "string"}, + "excluded_pattern": {"type": "string"}, + "group": {"type": "string"}, + "provenance": {"type": "string"}, + "account": ACCOUNTS, + "grouping": {"type": "string"}, + "split_rule": {"type": "boolean"}}} + +ADD_REPLICA_FILE = {"description": "add replica file", + "type": "object", + "properties": {"scope": SCOPE, + "name": NAME, + "bytes": BYTES, + "adler32": ADLER32}, + "required": ["scope", "name", "bytes", "adler32"]} + +ADD_REPLICA_FILES = {"description": "add replica files", + "type": "array", + "items": ADD_REPLICA_FILE, + "minItems": 1, + "maxItems": 1000} + +CACHE_ADD_REPLICAS = {"description": "rucio cache add replicas", + "type": "object", + "properties": {"files": ADD_REPLICA_FILES, + "rse": RSE, + "lifetime": LIFETIME, + "operation": {"enum": ["add_replicas"]}}, + "required": ['files', 'rse', 'lifetime', 'operation']} + +DELETE_REPLICA_FILE = {"description": "delete replica file", + "type": "object", + "properties": {"scope": SCOPE, + "name": NAME}, + "required": ["scope", "name"]} + +DELETE_REPLICA_FILES = {"description": "delete replica files", + "type": "array", + "items": DELETE_REPLICA_FILE, + "minItems": 1, + "maxItems": 1000} + +CACHE_DELETE_REPLICAS = {"description": "rucio cache delete replicas", + "type": "object", + "properties": {"files": DELETE_REPLICA_FILES, + "rse": RSE, + "operation": {"enum": ["delete_replicas"]}}, + "required": ['files', 'rse', 'operation']} + +MESSAGE_OPERATION = {"type": "object", + "properties": {'operation': {"enum": ["add_replicas", "delete_replicas"]}}} + +ACCOUNT_ATTRIBUTE = {"description": "Account attribute", + "type": "string", + "pattern": r'^[a-zA-Z0-9-_\\/\\.]{1,30}$'} + +SCOPE_NAME_REGEXP = '/(\w.*?)/(.*)' + +DISTANCE = {"description": "RSE distance", + "type": "object", + "properties": { + "src_rse_id": {"type": "string"}, + "dest_rse_id": {"type": "string"}, + "ranking": {"type": "integer"} + }, + "required": ["src_rse_id", "dest_rse_id", "ranking"], + "additionalProperties": True} + +IMPORT = {"description": "import data into rucio.", + "type": "object", + "properties": { + "rses": { + "type": "object" + }, + "distances": { + "type": "object" + } + }} + +SCHEMAS = {'account': ACCOUNT, + 'account_type': ACCOUNT_TYPE, + 'activity': ACTIVITY, + 'name': NAME, + 'r_name': R_NAME, + 'rse': RSE, + 'rse_attribute': RSE_ATTRIBUTE, + 'scope': SCOPE, + 'r_scope': R_SCOPE, + 'did': DID, + 'did_filters': DID_FILTERS, + 'r_did': R_DID, + 'dids': DIDS, + 'rule': RULE, + 'r_dids': R_DIDS, + 'collection': COLLECTION, + 'collections': COLLECTIONS, + 'attachment': ATTACHMENT, + 'attachments': ATTACHMENTS, + 'subscription_filter': SUBSCRIPTION_FILTER, + 'cache_add_replicas': CACHE_ADD_REPLICAS, + 'cache_delete_replicas': CACHE_DELETE_REPLICAS, + 'account_attribute': ACCOUNT_ATTRIBUTE, + 'import': IMPORT} + + +def validate_schema(name, obj): + """ + Validate object against json schema + + :param name: The json schema name. + :param obj: The object to validate. + """ + try: + if obj: + validate(obj, SCHEMAS.get(name, {})) + except ValidationError as error: # NOQA, pylint: disable=W0612 + raise InvalidObject("Problem validating %(name)s : %(error)s" % locals()) diff --git a/lib/rucio/daemons/hermes/kafka/filters/default.py b/lib/rucio/daemons/hermes/kafka/filters/default.py new file mode 100644 index 0000000..de08fc3 --- /dev/null +++ b/lib/rucio/daemons/hermes/kafka/filters/default.py @@ -0,0 +1,51 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import logging + +class default: + def __init__(self, logger, producer, topics): + self.logger = logger + self.producer = producer + self.topics = topics + + def process(self, messages): + for topic in self.topics: + self._process(topic, messages) + + def _process(self, topic, messages): + to_delete = [] + msg_count = 0 + for message in messages: + try: + self.send_message(topic, message) + to_delete.append(message["id"]) + msg_count += 1 + except Exception as exc: + self.logger(logging.WARN, f"error sending: {message}: {exc}") + self.logger(logging.INFO, f"sent {msg_count} messages to Kafka") + return to_delete + + def send_message(self, topic, message): + d = { + "event_type": str(message["event_type"]).lower(), + "payload": message["payload"], + "created_at": str(message["created_at"]) + } + + logging.debug(f"kafka sending: {message}") + value = json.dumps(d) + self.producer.produce(topic, key="message", value=value) + self.producer.flush() diff --git a/lib/rucio/daemons/hermes/kafka/filters/filtered_list.py b/lib/rucio/daemons/hermes/kafka/filters/filtered_list.py new file mode 100644 index 0000000..f891ec5 --- /dev/null +++ b/lib/rucio/daemons/hermes/kafka/filters/filtered_list.py @@ -0,0 +1,99 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from rucio.daemons.hermes.kafka.filters.default import default +from rucio.client import Client + +class filtered_list(default): + + RUBIN_BUTLER = "rubin_butler" + RUBIN_SIDECAR = "rubin_sidecar" + + def __init__(self, logger, producer, topics): + self.logger = logger + self.producer = producer + self.topics = topics + self.client = Client() + + self.logger(logging.INFO, f'Messages will only be sent to kafka topics: "{self.topics}"') + + def process(self, messages): + """Process Hermes events, and send those matching the filter parameters to RSE host locations + + Parameters + ---------- + messages : `list` + Hermes events to process + """ + to_delete = [] + msg_count = 0 + discard_count = 0 + # cycle through all the messages, applying the filter rules + # stated above. + for message in messages: + # if the event_type isn't 'transfer-done', then ignore this message + # and mark it for deletion. + if str(message['event_type']).lower() != 'transfer-done': + discard_count += 1 + to_delete.append(message['id']) + continue + try: + # get the destination RSE + destination = str(message['payload'].get('dst-rse')) + + # check to see if the destination RSE is in the list + # of topics we're filtering. If it's not in the list + # then discard it and go on to the next messages + if self.topics is not None: + if destination not in self.topics: + # this destination RSE wasn't in the list specified + # in rucio.cfg, so discard it and go to the next + # message + discard_count += 1 + to_delete.append(message['id']) + continue + + # get the metadata for this file + scope = str(message['payload'].get('scope')) + name = str(message['payload'].get('name')) + metadata = self.client.get_metadata(plugin='ALL', scope=scope, name=name) + self.logger(logging.INFO, f"name and metadata: {name}: {metadata}") + + # check to see if this is a intended to be ingested + # if not, discard it + butler_ingest = metadata.get(self.RUBIN_BUTLER) + if butler_ingest is None: + discard_count += 1 + to_delete.append(message['id']) + continue + message['payload'][self.RUBIN_BUTLER] = butler_ingest + + # check to see if there's sidecar metadata, and if there is, + # include it. + butler_sidecar = metadata.get(self.RUBIN_SIDECAR) + if butler_sidecar is not None: + message['payload'][self.RUBIN_SIDECAR] = butler_sidecar + + # send the message to Kafka + self.send_message(topic=destination, message=message) + + to_delete.append(message['id']) + msg_count += 1 + except Exception as exc: + self.logger(logging.WARN, f'error sending {message}: {exc}') + + self.logger(logging.INFO, f'sent {msg_count} messages to Kafka') + self.logger(logging.INFO, f'discarded {discard_count} filtered messages') + return to_delete diff --git a/lib/rucio/daemons/hermes/kafka/hermesk.py b/lib/rucio/daemons/hermes/kafka/hermesk.py new file mode 100644 index 0000000..390bdf5 --- /dev/null +++ b/lib/rucio/daemons/hermes/kafka/hermesk.py @@ -0,0 +1,766 @@ +# -*- coding: utf-8 -*- +# Copyright European Organization for Nuclear Research (CERN) since 2012 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" + Hermesk is a daemon that get the messages and sends them to external services (influxDB, ES, ActiveMQ, Kafka). +""" + +import calendar +import datetime +import functools +import json +import logging +import random +import re +import smtplib +import socket +import sys +import threading +import time +from configparser import NoOptionError, NoSectionError +from email.mime.text import MIMEText +from typing import TYPE_CHECKING + +import requests +import stomp + +import rucio.db.sqla.util +from rucio.common.config import ( + config_get, + config_get_bool, + config_get_int, + config_get_list, +) +from rucio.common.exception import DatabaseException +from rucio.common.logging import setup_logging +from rucio.core.message import delete_messages, retrieve_messages +from rucio.core.monitor import MetricManager +from rucio.daemons.common import run_daemon +from rucio.daemons.hermes.kafka.kafka_support import setup_kafka, deliver_to_kafka + +if TYPE_CHECKING: + from collections.abc import Callable + from types import FrameType + from typing import Optional + + from rucio.daemons.common import HeartbeatHandler + +logging.getLogger("requests").setLevel(logging.CRITICAL) + +METRICS = MetricManager(module=__name__) +graceful_stop = threading.Event() +DAEMON_NAME = "hermesk" + +RECONNECT_COUNTER = METRICS.counter( + name="reconnect.{host}", + documentation="Counts Hermesk reconnects to different brokers", + labelnames=("host",), +) + + +def default(datetype): + if isinstance(datetype, (datetime.date, datetime.datetime)): + return datetype.isoformat() + + +class HermesListener(stomp.ConnectionListener): + """ + Hermes Listener + """ + + def __init__(self, broker): + """ + __init__ + """ + self.__broker = broker + + def on_error(self, frame): + """ + Error handler + """ + logging.error("[broker] [%s]: %s", self.__broker, frame.body) + + +def setup_activemq(logger: "Callable"): + """ + Deliver messages to ActiveMQ + + :param logger: The logger object. + """ + + logger(logging.INFO, "[broker] Resolving brokers") + + brokers_alias = [] + brokers_resolved = [] + try: + brokers_alias = [ + broker.strip() + for broker in config_get("messaging-hermes", "brokers").split(",") + ] + except: + raise Exception("Could not load brokers from configuration") + + logger(logging.INFO, "[broker] Resolving broker dns alias: %s", brokers_alias) + brokers_resolved = [] + for broker in brokers_alias: + try: + addrinfos = socket.getaddrinfo( + broker, 0, socket.AF_INET, 0, socket.IPPROTO_TCP + ) + brokers_resolved.extend(ai[4][0] for ai in addrinfos) + except socket.gaierror as ex: + logger( + logging.ERROR, + "[broker] Cannot resolve domain name %s (%s)", + broker, + str(ex), + ) + + logger(logging.DEBUG, "[broker] Brokers resolved to %s", brokers_resolved) + + if not brokers_resolved: + logger(logging.FATAL, "[broker] No brokers resolved.") + return None, None, None, None, None + + broker_timeout = 3 + if not broker_timeout: # Allow zero in config + broker_timeout = None + + logger(logging.INFO, "[broker] Checking authentication method") + use_ssl = True + try: + use_ssl = config_get_bool("messaging-hermes", "use_ssl") + except: + logger( + logging.INFO, + "[broker] Could not find use_ssl in configuration -- please update your rucio.cfg", + ) + + port = config_get_int("messaging-hermes", "port") + vhost = config_get("messaging-hermes", "broker_virtual_host", raise_exception=False) + if not use_ssl: + username = config_get("messaging-hermes", "username") + password = config_get("messaging-hermes", "password") + port = config_get_int("messaging-hermes", "nonssl_port") + + conns = [] + for broker in brokers_resolved: + if not use_ssl: + logger( + logging.INFO, + "[broker] setting up username/password authentication: %s", + broker, + ) + else: + logger( + logging.INFO, + "[broker] setting up ssl cert/key authentication: %s", + broker, + ) + + con = stomp.Connection12( + host_and_ports=[(broker, port)], + vhost=vhost, + keepalive=True, + timeout=broker_timeout, + ) + if use_ssl: + con.set_ssl( + key_file=config_get("messaging-hermes", "ssl_key_file"), + cert_file=config_get("messaging-hermes", "ssl_cert_file"), + ) + + con.set_listener( + "rucio-hermes", HermesListener(con.transport._Transport__host_and_ports[0]) + ) + + conns.append(con) + destination = config_get("messaging-hermes", "destination") + return conns, destination, username, password, use_ssl + + +def deliver_to_activemq( + messages, conns, destination, username, password, use_ssl, logger +): + """ + Deliver messages to ActiveMQ + + :param messages: The list of messages. + :param conns: A list of connections. + :param destination: The destination topic or queue. + :param username: The username if no SSL connection. + :param password: The username if no SSL connection. + :param use_ssl: Boolean to choose if SSL connection is used. + :param logger: The logger object. + + :returns: List of message_id to delete + """ + to_delete = [] + for message in messages: + try: + conn = random.sample(conns, 1)[0] + if not conn.is_connected(): + host_and_ports = conn.transport._Transport__host_and_ports[0][0] + RECONNECT_COUNTER.labels(host=host_and_ports.split(".")[0]).inc() + if not use_ssl: + logger( + logging.INFO, + "[broker] - connecting with USERPASS to %s", + host_and_ports, + ) + conn.connect(username, password, wait=True) + else: + logger( + logging.INFO, + "[broker] - connecting with SSL to %s", + host_and_ports, + ) + conn.connect(wait=True) + + conn.send( + body=json.dumps( + { + "event_type": str(message["event_type"]).lower(), + "payload": message["payload"], + "created_at": str(message["created_at"]), + } + ), + destination=destination, + headers={ + "persistent": "true", + "event_type": str(message["event_type"]).lower(), + }, + ) + + to_delete.append(message["id"]) + except ValueError: + logger( + logging.ERROR, + "[broker] Cannot serialize payload to JSON: %s", + str(message["payload"]), + ) + to_delete.append(message["id"]) + continue + except stomp.exception.NotConnectedException as error: + logger( + logging.WARNING, + "[broker] Could not deliver message due to NotConnectedException: %s", + str(error), + ) + continue + except stomp.exception.ConnectFailedException as error: + logger( + logging.WARNING, + "[broker] Could not deliver message due to ConnectFailedException: %s", + str(error), + ) + continue + except Exception as error: + logger(logging.ERROR, "[broker] Could not deliver message: %s", str(error)) + continue + + if str(message["event_type"]).lower().startswith("transfer") or str( + message["event_type"] + ).lower().startswith("stagein"): + logger( + logging.DEBUG, + "[broker] - event_type: %s, scope: %s, name: %s, rse: %s, request-id: %s, transfer-id: %s, created_at: %s", + str(message["event_type"]).lower(), + message["payload"].get("scope", None), + message["payload"].get("name", None), + message["payload"].get("dst-rse", None), + message["payload"].get("request-id", None), + message["payload"].get("transfer-id", None), + str(message["created_at"]), + ) + + elif str(message["event_type"]).lower().startswith("dataset"): + logger( + logging.DEBUG, + "[broker] - event_type: %s, scope: %s, name: %s, rse: %s, rule-id: %s, created_at: %s)", + str(message["event_type"]).lower(), + message["payload"].get("scope", None), + message["payload"].get("name", None), + message["payload"].get("rse", None), + message["payload"].get("rule_id", None), + str(message["created_at"]), + ) + + elif str(message["event_type"]).lower().startswith("deletion"): + if "url" not in message["payload"]: + message["payload"]["url"] = "unknown" + logger( + logging.DEBUG, + "[broker] - event_type: %s, scope: %s, name: %s, rse: %s, url: %s, created_at: %s)", + str(message["event_type"]).lower(), + message["payload"].get("scope", None), + message["payload"].get("name", None), + message["payload"].get("rse", None), + message["payload"].get("url", None), + str(message["created_at"]), + ) + else: + logger(logging.DEBUG, "[broker] Other message: %s", message) + return to_delete + + +def deliver_emails(messages: list[dict], logger: "Callable") -> list: + """ + Sends emails + + :param messages: The list of messages. + :param logger: The logger object. + + :returns: List of message_id to delete + """ + + email_from = config_get("messaging-hermes", "email_from") + send_email = config_get_bool( + "messaging-hermes", "send_email", raise_exception=False, default=True + ) + to_delete = [] + for message in messages: + if message['event_type'] == 'email': + msg = MIMEText(message['payload']['body']) + msg['From'] = email_from + msg['To'] = ', '.join(message['payload']['to']) + msg['Subject'] = message['payload']['subject'] + + try: + if send_email: + smtp = smtplib.SMTP() + smtp.connect() + smtp.sendmail( + msg["From"], message["payload"]["to"], msg.as_string() + ) + smtp.quit() + to_delete.append(message["id"]) + except Exception as error: + logger(logging.ERROR, "Cannot send email : %s", str(error)) + else: + to_delete.append(message["id"]) + continue + return to_delete + + +def submit_to_elastic(messages: list[dict], endpoint: str, logger: "Callable") -> int: + """ + Aggregate a list of message to ElasticSearch + + :param messages: The list of messages. + :param endpoint: The ES endpoint were to send the messages. + :param logger: The logger object. + + :returns: HTTP status code. 200 and 204 OK. Rest is failure. + """ + text = "" + for message in messages: + text += '{ "index":{ } }\n%s\n' % json.dumps(message, default=default) + res = requests.post( + endpoint, data=text, headers={"Content-Type": "application/json"} + ) + return res.status_code + + +def aggregate_to_influx( + messages: list[dict], bin_size: int, endpoint: str, logger: "Callable" +) -> int: + """ + Aggregate a list of message using a certain bin_size + and submit them to a InfluxDB endpoint + + :param messages: The list of messages. + :param bin_size: The size of the bins for the aggreagation (e.g. 10m, 1h, etc.). + :param endpoint: The InfluxDB endpoint were to send the messages. + :param logger: The logger object. + + :returns: HTTP status code. 200 and 204 OK. Rest is failure. + """ + bins = {} + dtime = datetime.datetime.now() + microsecond = dtime.microsecond + + for message in messages: + event_type = message["event_type"] + payload = message["payload"] + if event_type in ["transfer-failed", "transfer-done"]: + if not payload["transferred_at"]: + logger( + logging.WARNING, + "No transferred_at for message. Reason : %s", + payload["reason"], + ) + continue + transferred_at = time.strptime( + payload["transferred_at"], "%Y-%m-%d %H:%M:%S" + ) + if bin_size == "1m": + transferred_at = int(calendar.timegm(transferred_at)) * 1000000000 + transferred_at += microsecond + if transferred_at not in bins: + bins[transferred_at] = {} + src_rse, dest_rse, activity = ( + payload["src-rse"], + payload["dst-rse"], + payload["activity"], + ) + activity = re.sub(" ", r"\ ", activity) + key = "transfer,activity=%s,src_rse=%s,dst_rse=%s" % ( + activity, + src_rse, + dest_rse, + ) + if key not in bins[transferred_at]: + bins[transferred_at][key] = [0, 0, 0, 0] + if event_type == "transfer-done": + bins[transferred_at][key][0] += 1 + bins[transferred_at][key][1] += payload["bytes"] + if event_type == "transfer-failed": + bins[transferred_at][key][2] += 1 + bins[transferred_at][key][3] += payload["bytes"] + elif event_type in ["deletion-failed", "deletion-done"]: + created_at = message["created_at"] + if bin_size == "1m": + created_at = created_at.replace( + second=0, microsecond=0, tzinfo=datetime.timezone.utc + ).timestamp() + created_at = int(created_at) * 1000000000 + created_at += microsecond + if created_at not in bins: + bins[created_at] = {} + rse = payload["rse"] + key = "deletion,rse=%s" % (rse) + if key not in bins[created_at]: + bins[created_at][key] = [0, 0, 0, 0] + if event_type == "deletion-done": + bins[created_at][key][0] += 1 + bins[created_at][key][1] += payload["bytes"] + if event_type == "deletion-failed": + bins[created_at][key][2] += 1 + bins[created_at][key][3] += payload["bytes"] + points = "" + for timestamp in bins: + for entry in bins[timestamp]: + metrics = bins[timestamp][entry] + event_type = entry.split(",")[0] + point = ( + "%s nb_%s_done=%s,bytes_%s_done=%s,nb_%s_failed=%s,bytes_%s_failed=%s %s" + % ( + entry, + event_type, + metrics[0], + event_type, + metrics[1], + event_type, + metrics[2], + event_type, + metrics[3], + timestamp, + ) + ) + points += point + points += "\n" + influx_token = config_get("hermes", "influxdb_token", False, None) + if influx_token: + headers = {"Authorization": "Token %s" % influx_token} + if points: + res = requests.post(endpoint, headers=headers, data=points) + logger(logging.DEBUG, "%s", str(res.text)) + return res.status_code + return 204 + + +def hermesk(once: bool = False, bulk: int = 1000, sleep_time: int = 10) -> None: + """ + Creates a Hermesk Worker that can submit messages to different services (InfluXDB, ElasticSearch, ActiveMQ, Kafka) + The list of services need to be define in the config service in the hermes section. + The list of endpoints need to be defined in rucio.cfg in the hermes section. + + :param once: Run only once. + :param bulk: The number of requests to process. + :param sleep_time: Time between two cycles. + """ + run_daemon( + once=once, + graceful_stop=graceful_stop, + executable=DAEMON_NAME, + partition_wait_time=1, + sleep_time=sleep_time, + run_once_fnc=functools.partial( + run_once, + bulk=bulk, + ), + ) + + +def run_once(heartbeat_handler: "HeartbeatHandler", bulk: int, **_kwargs) -> bool: + + worker_number, total_workers, logger = heartbeat_handler.live() + try: + services_list = config_get_list("hermes", "services_list") + except (NoOptionError, NoSectionError, RuntimeError): + logger(logging.DEBUG, "No services found, exiting") + sys.exit(1) + + if "influx" in services_list: + influx_endpoint = None + try: + influx_endpoint = config_get("hermes", "influxdb_endpoint", False, None) + if not influx_endpoint: + logger( + logging.ERROR, + "InfluxDB defined in the services list, but no endpoint can be found", + ) + except Exception as err: + logger(logging.ERROR, str(err)) + if "elastic" in services_list: + elastic_endpoint = None + try: + elastic_endpoint = config_get("hermes", "elastic_endpoint", False, None) + if not elastic_endpoint: + logger( + logging.ERROR, + "Elastic defined in the services list, but no endpoint can be found", + ) + except Exception as err: + logger(logging.ERROR, str(err)) + conns = None + if "activemq" in services_list: + try: + conns, destination, username, password, use_ssl = setup_activemq(logger) + if not conns: + logger( + logging.ERROR, + "ActiveMQ defined in the services list, cannot be setup", + ) + except Exception as err: + logger(logging.ERROR, str(err)) + + message_filter = None + if "kafka" in services_list: + try: + message_filter = setup_kafka(logger) + except Exception as err: + logging.exception(err) + + worker_number, total_workers, logger = heartbeat_handler.live() + message_dict = {} + message_ids = [] + start_time = time.time() + messages = retrieve_messages( + bulk=bulk, + old_mode=False, + thread=worker_number, + total_threads=total_workers, + ) + + msg_num = 0 + to_delete = [] + if messages: + for message in messages: + service = message["services"] + if service not in message_dict: + message_dict[service] = [] + message_dict[service].append(message) + message_ids.append(message["id"]) + logger( + logging.DEBUG, + "Retrieved %i messages retrieved in %s seconds", + len(messages), + time.time() - start_time, + ) + + if "influx" in message_dict and influx_endpoint: + # For influxDB, bulk submission, either everything succeeds or fails + t_time = time.time() + logger(logging.DEBUG, "Will submit to influxDB") + try: + state = aggregate_to_influx( + messages=message_dict["influx"], + bin_size="1m", + endpoint=influx_endpoint, + logger=logger, + ) + if state in [204, 200]: + logger( + logging.INFO, + "%s messages successfully submitted to influxDB in %s seconds", + len(message_dict["influx"]), + time.time() - t_time, + ) + for message in message_dict["influx"]: + to_delete.append(message) + else: + logger( + logging.ERROR, + "Failure to submit %s messages to influxDB. Returned status: %s", + len(message_dict["influx"]), + state, + ) + except Exception as error: + logger(logging.ERROR, "Error sending to InfluxDB : %s", str(error)) + + if "elastic" in message_dict and elastic_endpoint: + # For elastic, bulk submission, either everything succeeds or fails + t_time = time.time() + try: + state = submit_to_elastic( + messages=message_dict["elastic"], + endpoint=elastic_endpoint, + logger=logger, + ) + if state in [200, 204]: + logger( + logging.INFO, + "%s messages successfully submitted to elastic in %s seconds", + len(message_dict["elastic"]), + time.time() - t_time, + ) + for message in message_dict["elastic"]: + to_delete.append(message) + else: + logger( + logging.ERROR, + "Failure to submit %s messages to elastic. Returned status: %s", + len(message_dict["influx"]), + state, + ) + except Exception as error: + logger(logging.ERROR, "Error sending to Elastic : %s", str(error)) + + if "email" in message_dict: + t_time = time.time() + try: + messages_sent = deliver_emails( + messages=message_dict["email"], logger=logger + ) + logger( + logging.INFO, + "%s messages successfully submitted by emails in %s seconds", + len(message_dict["email"]), + time.time() - t_time, + ) + for message in message_dict["email"]: + if message["id"] in messages_sent: + to_delete.append(message) + except Exception as error: + logger(logging.ERROR, "Error sending email : %s", str(error)) + + if "activemq" in message_dict and conns: + t_time = time.time() + try: + messages_sent = deliver_to_activemq( + messages=message_dict["activemq"], + conns=conns, + destination=destination, + username=username, + password=password, + use_ssl=use_ssl, + logger=logger, + ) + logger( + logging.INFO, + "%s messages successfully submitted to ActiveMQ in %s seconds", + len(message_dict["activemq"]), + time.time() - t_time, + ) + for message in message_dict["activemq"]: + if message["id"] in messages_sent: + to_delete.append(message) + except Exception as error: + logger(logging.ERROR, "Error sending to ActiveMQ : %s", str(error)) + if "kafka" in message_dict: + t_time = time.time() + try: + messages_sent = deliver_to_kafka( + message_filter=message_filter, messages=message_dict["kafka"] + ) + logger( + logging.INFO, + "%s messages processed by Kafka filter in %s seconds", + len(message_dict["kafka"]), + time.time() - t_time, + ) + for message in message_dict["kafka"]: + if message["id"] in messages_sent: + to_delete.append(message) + except Exception as error: + logger(logging.ERROR, "Error sending to Kafka : %s", str(error)) + + + + logger(logging.INFO, "Deleting %s messages", len(to_delete)) + to_delete = [ + { + "id": message["id"], + "created_at": message["created_at"], + "updated_at": message["created_at"], + "payload": str(message["payload"]), + "event_type": message["event_type"], + } + for message in to_delete + ] + delete_messages(messages=to_delete) + must_sleep = True + return must_sleep + + +def stop(signum: "Optional[int]" = None, frame: "Optional[FrameType]" = None) -> None: + """ + Graceful exit. + """ + logging.info("Caught CTRL-C - waiting for cycle to end before shutting down") + graceful_stop.set() + + +def run( + once: bool = False, + threads: int = 1, + bulk: int = 1000, + sleep_time: int = 10, + broker_timeout: int = 3, +) -> None: + """ + Starts up the hermesk threads. + """ + setup_logging(process_name=DAEMON_NAME) + + if rucio.db.sqla.util.is_old_db(): + raise DatabaseException("Database was not updated, daemon won't start") + + logging.info("starting hermesk threads") + thread_list = [ + threading.Thread( + target=hermesk, + kwargs={ + "once": once, + "bulk": bulk, + "sleep_time": sleep_time, + }, + ) + for _ in range(0, threads) + ] + + for thrd in thread_list: + thrd.start() + + logging.debug(thread_list) + # Interruptible joins require a timeout. + while thread_list: + thread_list = [ + thread.join(timeout=3.14) + for thread in thread_list + if thread and thread.is_alive() + ] diff --git a/lib/rucio/daemons/hermes/kafka/kafka_support.py b/lib/rucio/daemons/hermes/kafka/kafka_support.py new file mode 100644 index 0000000..b7b1c40 --- /dev/null +++ b/lib/rucio/daemons/hermes/kafka/kafka_support.py @@ -0,0 +1,155 @@ +# -*- coding: utf-8 -*- +# Copyright European Organization for Nuclear Research (CERN) since 2012 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" + Hermes2 is a daemon that get the messages and sends them to external services (influxDB, ES, ActiveMQ). +""" + +from confluent_kafka import Producer +import importlib +import json +import logging +import socket + +from rucio.common.config import ( + config_get, + config_get_int, + config_get_bool, + config_get_list, +) + +mylog = None +default_filter_class_name = "rucio.daemons.hermes.kafka.filters.default" + +def setup_kafka(logger): + """ + Deliver messages to Kafka + + :param logger: The logger object. + """ + + logger(logging.INFO, "[broker-kafka] Resolving brokers") + + # the following retrieves all rucio.cfg information + + # get broker name + try: + brokers = config_get("messaging-hermes-kafka", "brokers") + except Exception: + raise Exception("Could not load 'brokers' from configuration") + + # check to see if ssl is being used to authenticate + logger(logging.INFO, "[broker] Checking authentication method") + try: + use_ssl = config_get_bool("messaging-hermes-kafka", "use_ssl") + except Exception: + logger(logging.INFO, "[broker] Could not find use_ssl in configuration -- update your rucio.cfg") + + # if ssl is used, get the reset of the params we'll use to authenticate + if use_ssl: + ca_cert = config_get_bool("messaging-hermes-kafka", "ca_cert") + certfile = config_get_bool("messaging-hermes-kafka", "certfile") + keyfile = config_get_bool("messaging-hermes-kafka", "keyfile") + # get the username and password, if specified + else: + username = config_get("messaging-hermes-kafka", "username", raise_exception=False, default=None) + password = config_get("messaging-hermes-kafka", "password", raise_exception=False, default=None) + + config = { 'bootstrap.servers': f'{brokers}', + 'client.id': socket.gethostname(), + } + + # configure to use SSL + if use_ssl: + logger(logging.INFO, "[broker-kafka] use_ssl") + producer = Producer(bootstrap_servers=f'{brokers}', + security_protocol="SSL", + ssl_cafile=ca_cert, + ssl_certfile=certfile, + ssl_keyfile=keyfile) + elif username is not None: + logger(logging.INFO, "[broker-kafka] username") + producer = Producer(bootstrap_servers=f'{brokers}', + sasl_username=username, + sasl_password=password) + else: + logger(logging.INFO, f"[broker-kafka] plain {config}") + producer = Producer(config) + + + # check to see if a message filter is specified; if it isn't, use the default + filter_name = config_get("messaging-hermes-kafka", "message_filter", raise_exception=False, default=None) + if filter_name is None: + logger(logging.WARN, f'no message_filter specified, using "{default_filter_class_name}"') + filter_name = default_filter_class_name + else: + logger(logging.INFO, f'message_filter set to "{filter_name}"') + + # retrieve the topic list. + topic_list = config_get_list( + "messaging-hermes-kafka", "topic_list", raise_exception=False, default=None + ) + + if topic_list is None: + logger( + logging.INFO, + "no topic_list specified, sending to all named RSEs as topics", + ) + + # create the class used for message filtering + message_filter = None + try: + message_filter_class = create_class(filter_name) + message_filter = message_filter_class(logger, producer, topic_list) + except Exception as e: + logging.exception(e) + + return message_filter + + +def create_class(class_name): + """Create a class specified by class_name + """ + + name = class_name.split(".")[-1] + filter_class_name = class_name + "." + name + + dot = filter_class_name.rindex(".") + module_name = filter_class_name[0:dot] + _class_name = filter_class_name[dot + 1 :] + + classobj = getattr(importlib.import_module(module_name), _class_name) + if classobj is None: + raise RuntimeError( + 'Attempt to instantiate class "' + + name + + '" failed. Could not find that class.' + ) + + return classobj + + +def deliver_to_kafka(message_filter, messages): + """ + Deliver messages to Kafka + + :param message_filter: Message filtering object. + :param messages: The list of messages. + + :returns: List of message_id to delete + """ + + to_delete = message_filter.process(messages) + return to_delete diff --git a/lib/rucio/rse/protocols/34.0.0/protocol.py b/lib/rucio/rse/protocols/34.0.0/protocol.py new file mode 100644 index 0000000..9813992 --- /dev/null +++ b/lib/rucio/rse/protocols/34.0.0/protocol.py @@ -0,0 +1,538 @@ +# -*- coding: utf-8 -*- +# Copyright European Organization for Nuclear Research (CERN) since 2012 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This module defines the base class for implementing a transfer protocol, +along with some of the default methods for LFN2PFN translations. +""" + +import hashlib +import logging +from configparser import NoOptionError, NoSectionError +from urllib.parse import urlparse + +from rucio.common import config, exception +from rucio.common.utils import register_policy_package_algorithms +from rucio.rse import rsemanager + +if getattr(rsemanager, 'CLIENT_MODE', None): + from rucio.client.rseclient import RSEClient + +if getattr(rsemanager, 'SERVER_MODE', None): + from rucio.common.types import InternalScope + from rucio.core import replica + from rucio.core.rse import get_rse_vo + + +class RSEDeterministicTranslation(object): + """ + Execute the logic for translating a LFN to a path. + """ + + _LFN2PFN_ALGORITHMS = {} + _DEFAULT_LFN2PFN = "hash" + + def __init__(self, rse=None, rse_attributes=None, protocol_attributes=None): + """ + Initialize a translator object from the RSE, its attributes, and the protocol-specific + attributes. + + :param rse: Name of RSE for this translation. + :param rse_attributes: A dictionary of RSE-specific attributes for use in the translation. + :param protocol_attributes: A dictionary of RSE/protocol-specific attributes. + """ + self.rse = rse + self.rse_attributes = rse_attributes if rse_attributes else {} + self.protocol_attributes = protocol_attributes if protocol_attributes else {} + self.loaded_policy_modules = False + + @classmethod + def supports(cls, name): + """ + Check to see if a specific algorithm is supported. + + :param name: Name of the deterministic algorithm. + :returns: True if `name` is an algorithm supported by the translator class, False otherwise. + """ + return name in cls._LFN2PFN_ALGORITHMS + + @staticmethod + def register(lfn2pfn_callable, name=None): + """ + Provided a callable function, register it as one of the valid LFN2PFN algorithms. + + The callable will receive five arguments: + - scope: Scope of the LFN. + - name: LFN's path name + - rse: RSE name the translation is being done for. + - rse_attributes: Attributes of the RSE. + - protocol_attributes: Attributes of the RSE's protocol + The return value should be the last part of the PFN - it will be appended to the + rest of the URL. + + :param lfn2pfn_callable: Callable function to use for generating paths. + :param name: Algorithm name used for registration. If None, then `lfn2pfn_callable.__name__` is used. + """ + if name is None: + name = lfn2pfn_callable.__name__ + RSEDeterministicTranslation._LFN2PFN_ALGORITHMS[name] = lfn2pfn_callable + + @staticmethod + def __hash(scope, name, rse, rse_attrs, protocol_attrs): + """ + Given a LFN, turn it into a sub-directory structure using a hash function. + + This takes the MD5 of the LFN and uses the first four characters as a subdirectory + name. + + :param scope: Scope of the LFN. + :param name: File name of the LFN. + :param rse: RSE for PFN (ignored) + :param rse_attrs: RSE attributes for PFN (ignored) + :param protocol_attrs: RSE protocol attributes for PFN (ignored) + :returns: Path for use in the PFN generation. + """ + del rse + del rse_attrs + del protocol_attrs + hstr = hashlib.md5(('%s:%s' % (scope, name)).encode('utf-8')).hexdigest() + if scope.startswith('user') or scope.startswith('group'): + scope = scope.replace('.', '/') + return '%s/%s/%s/%s' % (scope, hstr[0:2], hstr[2:4], name) + + @staticmethod + def __identity(scope, name, rse, rse_attrs, protocol_attrs): + """ + Given a LFN, convert it directly to a path using the mapping: + + scope:path -> scope/path + + :param scope: Scope of the LFN. + :param name: File name of the LFN. + :param rse: RSE for PFN (ignored) + :param rse_attrs: RSE attributes for PFN (ignored) + :param protocol_attrs: RSE protocol attributes for PFN (ignored) + :returns: Path for use in the PFN generation. + """ + del rse + del rse_attrs + del protocol_attrs + if scope.startswith('user') or scope.startswith('group'): + scope = scope.replace('.', '/') + return '%s/%s' % (scope, name) + + @staticmethod + def __belleii(scope, name, rse, rse_attrs, protocol_attrs): + """ + Given a LFN, convert it directly to a path using the mapping: + + path -> path + This is valid only for the belleii convention where the scope can be determined + from the LFN using a determinitic function. + + :param scope: Scope of the LFN. + :param name: File name of the LFN. + :param rse: RSE for PFN (ignored) + :param rse_attrs: RSE attributes for PFN (ignored) + :param protocol_attrs: RSE protocol attributes for PFN (ignored) + :returns: Path for use in the PFN generation. + """ + del scope + del rse + del rse_attrs + del protocol_attrs + return name + + @staticmethod + def __ligo(scope, name, rse, rse_attrs, protocol_attrs): + """ + Given a LFN, convert it directly to a path using the Caltech schema + + e.g.,: ER8:H-H1_HOFT_C02-1126256640-4096 -> + ER8/hoft_C02/H1/H-H1_HOFT_C02-11262/H-H1_HOFT_C02-1126256640-4096 + + :param scope: Scope of the LFN (observing run: ER8, O2, postO1, ...) + :param name: File name of the LFN (E.g., H-H1_HOFT_C02-1126256640-4096.gwf) + :param rse: RSE for PFN (ignored) + :param rse_attrs: RSE attributes for PFN (ignored) + :param protocol_attrs: RSE protocol attributes for PFN (ignored) + :returns: Path for use in the PFN generation. + """ + del rse + del rse_attrs + del protocol_attrs + from ligo_rucio import lfn2pfn as ligo_lfn2pfn # pylint: disable=import-error + return ligo_lfn2pfn.ligo_lab(scope, name, None, None, None) + + @staticmethod + def __xenon(scope, name, rse, rse_attrs, protocol_attrs): + """ + Given a LFN, turn it into a two level sub-directory structure based on the scope + plus a third level based on the name + :param scope: Scope of the LFN. + :param name: File name of the LFN. + :param rse: RSE for PFN (ignored) + :param rse_attrs: RSE attributes for PFN (ignored) + :param protocol_attrs: RSE protocol attributes for PFN (ignored) + :returns: Path for use in the PFN generation. + """ + del rse + del rse_attrs + del protocol_attrs + + return '%s/%s/%s/%s' % (scope[0:7], scope[4:len(scope)], name.split('-')[0] + "-" + name.split('-')[1], name) + + @staticmethod + def __lsst(scope, name, rse, rse_attrs, protocol_attrs): + """ + LFN2PFN algorithm for Rubin-LSST in the ESCAPE project + + Replace convention delimiter '__' by '/' + The Escape instance does use the 'generic' Rucio schema. + + :param scope: Scope of the LFN (ignored) + :param name: File name of the LFN. + :param rse: RSE for PFN (ignored) + :param rse_attrs: RSE attributes for PFN (ignored) + :param protocol_attrs: RSE protocol attributes for PFN (ignored) + :returns: Path for use in the PFN generation. + """ + del scope + del rse + del rse_attrs + del protocol_attrs + return name.replace('__', '/') + + @staticmethod + def __rubinbutler(scope, name, rse, rse_attrs, protocol_attrs): + del rse + del rse_attrs + del protocol_attrs + return '%s' % name + + @classmethod + def _module_init_(cls): + """ + Initialize the class object on first module load. + """ + cls.register(cls.__hash, "hash") + cls.register(cls.__identity, "identity") + cls.register(cls.__ligo, "ligo") + cls.register(cls.__belleii, "belleii") + cls.register(cls.__xenon, "xenon") + cls.register(cls.__lsst, "lsst") + cls.register(cls.__rubinbutler, "rubinbutler") + policy_module = None + try: + policy_module = config.config_get('policy', 'lfn2pfn_module') + except (NoOptionError, NoSectionError): + pass + if policy_module: + # TODO: The import of importlib is done like this due to a dependency issue with python 2.6 and incompatibility of the module with py3.x + # More information https://github.com/rucio/rucio/issues/875 + import importlib + importlib.import_module(policy_module) + + cls._DEFAULT_LFN2PFN = config.get_lfn2pfn_algorithm_default() + + def path(self, scope, name): + """ Transforms the logical file name into a PFN's path. + + :param lfn: filename + :param scope: scope + + :returns: RSE specific URI of the physical file + """ + # on first call, register any lfn2pfn algorithms from the policy package(s) (server only) + if getattr(rsemanager, 'SERVER_MODE', None) and not self.loaded_policy_modules: + register_policy_package_algorithms('lfn2pfn', RSEDeterministicTranslation._LFN2PFN_ALGORITHMS) + self.loaded_policy_modules = True + + algorithm = self.rse_attributes.get('lfn2pfn_algorithm', 'default') + if algorithm == 'default': + algorithm = RSEDeterministicTranslation._DEFAULT_LFN2PFN + algorithm_callable = RSEDeterministicTranslation._LFN2PFN_ALGORITHMS[algorithm] + return algorithm_callable(scope, name, self.rse, self.rse_attributes, self.protocol_attributes) + + +RSEDeterministicTranslation._module_init_() # pylint: disable=protected-access + + +class RSEProtocol(object): + """ This class is virtual and acts as a base to inherit new protocols from. It further provides some common functionality which applies for the amjority of the protocols.""" + + def __init__(self, protocol_attr, rse_settings, logger=logging.log): + """ Initializes the object with information about the referred RSE. + + :param protocol_attr: Properties of the requested protocol. + :param rse_settting: The RSE settings. + :param logger: Optional decorated logger that can be passed from the calling daemons or servers. + """ + self.auth_token = protocol_attr['auth_token'] + protocol_attr.pop('auth_token') + self.attributes = protocol_attr + self.translator = None + self.renaming = True + self.overwrite = False + self.rse = rse_settings + self.logger = logger + if self.rse['deterministic']: + self.translator = RSEDeterministicTranslation(self.rse['rse'], rse_settings, self.attributes) + if getattr(rsemanager, 'CLIENT_MODE', None) and \ + not RSEDeterministicTranslation.supports(self.rse.get('lfn2pfn_algorithm')): + # Remote server has an algorithm we don't understand; always make the server do the lookup. + setattr(self, 'lfns2pfns', self.__lfns2pfns_client) + else: + if getattr(rsemanager, 'CLIENT_MODE', None): + setattr(self, 'lfns2pfns', self.__lfns2pfns_client) + if getattr(rsemanager, 'SERVER_MODE', None): + setattr(self, '_get_path', self._get_path_nondeterministic_server) + + def lfns2pfns(self, lfns): + """ + Retruns a fully qualified PFN for the file referred by path. + + :param path: The path to the file. + + :returns: Fully qualified PFN. + """ + pfns = {} + prefix = self.attributes['prefix'] + + if not prefix.startswith('/'): + prefix = ''.join(['/', prefix]) + if not prefix.endswith('/'): + prefix = ''.join([prefix, '/']) + + lfns = [lfns] if isinstance(lfns, dict) else lfns + for lfn in lfns: + scope, name = str(lfn['scope']), lfn['name'] + if 'path' in lfn and lfn['path'] is not None: + pfns['%s:%s' % (scope, name)] = ''.join([self.attributes['scheme'], + '://', + self.attributes['hostname'], + ':', + str(self.attributes['port']), + prefix, + lfn['path'] if not lfn['path'].startswith('/') else lfn['path'][1:] + ]) + else: + try: + pfns['%s:%s' % (scope, name)] = ''.join([self.attributes['scheme'], + '://', + self.attributes['hostname'], + ':', + str(self.attributes['port']), + prefix, + self._get_path(scope=scope, name=name) + ]) + except exception.ReplicaNotFound as e: + self.logger(logging.WARNING, str(e)) + return pfns + + def __lfns2pfns_client(self, lfns): + """ Provides the path of a replica for non-deterministic sites. Will be assigned to get path by the __init__ method if neccessary. + + :param scope: list of DIDs + + :returns: dict with scope:name as keys and PFN as value (in case of errors the Rucio exception si assigned to the key) + """ + client = RSEClient() # pylint: disable=E0601 + + lfns = [lfns] if isinstance(lfns, dict) else lfns + lfn_query = ["%s:%s" % (lfn['scope'], lfn['name']) for lfn in lfns] + return client.lfns2pfns(self.rse['rse'], lfn_query, scheme=self.attributes['scheme']) + + def _get_path(self, scope, name): + """ Transforms the logical file name into a PFN. + Suitable for sites implementing the RUCIO naming convention. + This implementation is only invoked if the RSE is deterministic. + + :param scope: scope + :param name: filename + + :returns: RSE specific URI of the physical file + """ + return self.translator.path(scope, name) + + def _get_path_nondeterministic_server(self, scope, name): # pylint: disable=invalid-name + """ Provides the path of a replica for non-deterministic sites. Will be assigned to get path by the __init__ method if neccessary. """ + vo = get_rse_vo(self.rse['id']) # pylint: disable=E0601 + scope = InternalScope(scope, vo=vo) # pylint: disable=E0601 + rep = replica.get_replica(scope=scope, name=name, rse_id=self.rse['id']) # pylint: disable=E0601 + if 'path' in rep and rep['path'] is not None: + path = rep['path'] + elif 'state' in rep and (rep['state'] is None or rep['state'] == 'UNAVAILABLE'): + raise exception.ReplicaUnAvailable('Missing path information and state is UNAVAILABLE for replica %s:%s on non-deterministic storage named %s' % (scope, name, self.rse['rse'])) + else: + raise exception.ReplicaNotFound('Missing path information for replica %s:%s on non-deterministic storage named %s' % (scope, name, self.rse['rse'])) + if path.startswith('/'): + path = path[1:] + if path.endswith('/'): + path = path[:-1] + return path + + def parse_pfns(self, pfns): + """ + Splits the given PFN into the parts known by the protocol. It is also checked if the provided protocol supportes the given PFNs. + + :param pfns: a list of a fully qualified PFNs + + :returns: dic with PFN as key and a dict with path and name as value + + :raises RSEFileNameNotSupported: if the provided PFN doesn't match with the protocol settings + """ + ret = dict() + pfns = [pfns] if isinstance(pfns, str) else pfns + + for pfn in pfns: + parsed = urlparse(pfn) + scheme = parsed.scheme + hostname = parsed.netloc.partition(':')[0] + port = int(parsed.netloc.partition(':')[2]) if parsed.netloc.partition(':')[2] != '' else 0 + while '//' in parsed.path: + parsed = parsed._replace(path=parsed.path.replace('//', '/')) + path = parsed.path + prefix = self.attributes['prefix'] + while '//' in prefix: + prefix = prefix.replace('//', '/') + + # Protect against 'lazy' defined prefixes for RSEs in the repository + if not prefix.startswith('/'): + prefix = '/' + prefix + if not prefix.endswith('/'): + prefix += '/' + + if self.attributes['hostname'] != hostname: + if self.attributes['hostname'] != 'localhost': # In the database empty hostnames are replaced with localhost but for some URIs (e.g. file) a hostname is not included + raise exception.RSEFileNameNotSupported('Invalid hostname: provided \'%s\', expected \'%s\'' % (hostname, self.attributes['hostname'])) + + if self.attributes['port'] != port: + raise exception.RSEFileNameNotSupported('Invalid port: provided \'%s\', expected \'%s\'' % (port, self.attributes['port'])) + + if not path.startswith(prefix): + raise exception.RSEFileNameNotSupported('Invalid prefix: provided \'%s\', expected \'%s\'' % ('/'.join(path.split('/')[0:len(prefix.split('/')) - 1]), + prefix)) # len(...)-1 due to the leading '/ + + # Spliting parsed.path into prefix, path, filename + path = path.partition(prefix)[2] + name = path.split('/')[-1] + path = '/'.join(path.split('/')[:-1]) + if not path.startswith('/'): + path = '/' + path + if path != '/' and not path.endswith('/'): + path = path + '/' + ret[pfn] = {'path': path, 'name': name, 'scheme': scheme, 'prefix': prefix, 'port': port, 'hostname': hostname, } + + return ret + + def exists(self, path): + """ + Checks if the requested file is known by the referred RSE. + + :param path: Physical file name + + :returns: True if the file exists, False if it doesn't + + :raises SourceNotFound: if the source file was not found on the referred storage. + """ + raise NotImplementedError + + def connect(self): + """ + Establishes the actual connection to the referred RSE. + + :raises RSEAccessDenied: if no connection could be established. + """ + raise NotImplementedError + + def close(self): + """ Closes the connection to RSE.""" + raise NotImplementedError + + def get(self, path, dest, transfer_timeout=None): + """ + Provides access to files stored inside connected the RSE. + + :param path: Physical file name of requested file + :param dest: Name and path of the files when stored at the client + :param transfer_timeout: Transfer timeout (in seconds) + + :raises DestinationNotAccessible: if the destination storage was not accessible. + :raises ServiceUnavailable: if some generic error occured in the library. + :raises SourceNotFound: if the source file was not found on the referred storage. + """ + raise NotImplementedError + + def put(self, source, target, source_dir, transfer_timeout=None): + """ + Allows to store files inside the referred RSE. + + :param source: path to the source file on the client file system + :param target: path to the destination file on the storage + :param source_dir: Path where the to be transferred files are stored in the local file system + :param transfer_timeout: Transfer timeout (in seconds) + + :raises DestinationNotAccessible: if the destination storage was not accessible. + :raises ServiceUnavailable: if some generic error occured in the library. + :raises SourceNotFound: if the source file was not found on the referred storage. + """ + raise NotImplementedError + + def delete(self, path): + """ + Deletes a file from the connected RSE. + + :param path: path to the to be deleted file + + :raises ServiceUnavailable: if some generic error occured in the library. + :raises SourceNotFound: if the source file was not found on the referred storage. + """ + raise NotImplementedError + + def rename(self, path, new_path): + """ Allows to rename a file stored inside the connected RSE. + + :param path: path to the current file on the storage + :param new_path: path to the new file on the storage + + :raises DestinationNotAccessible: if the destination storage was not accessible. + :raises ServiceUnavailable: if some generic error occured in the library. + :raises SourceNotFound: if the source file was not found on the referred storage. + """ + raise NotImplementedError + + def get_space_usage(self): + """ + Get RSE space usage information. + + :returns: a list with dict containing 'totalsize' and 'unusedsize' + + :raises ServiceUnavailable: if some generic error occured in the library. + """ + raise NotImplementedError + + def stat(self, path): + """ + Returns the stats of a file. + + :param path: path to file + + :raises ServiceUnavailable: if some generic error occured in the library. + :raises SourceNotFound: if the source file was not found on the referred storage. + + :returns: a dict with two keys, filesize and adler32 of the file provided in path. + """ + raise NotImplementedError diff --git a/lib/rucio/rse/protocols/protocol.py b/lib/rucio/rse/protocols/protocol.py new file mode 100644 index 0000000..838880f --- /dev/null +++ b/lib/rucio/rse/protocols/protocol.py @@ -0,0 +1,592 @@ +# Copyright European Organization for Nuclear Research (CERN) since 2012 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This module defines the base class for implementing a transfer protocol, +along with some of the default methods for LFN2PFN translations. +""" +import hashlib +import logging +from collections.abc import Callable, Mapping +from configparser import NoOptionError, NoSectionError +from typing import TypeVar +from urllib.parse import urlparse + +from rucio.common import config, exception +from rucio.common.constants import RseAttr +from rucio.common.plugins import PolicyPackageAlgorithms +from rucio.rse import rsemanager + +if getattr(rsemanager, 'CLIENT_MODE', None): + from rucio.client.rseclient import RSEClient + +if getattr(rsemanager, 'SERVER_MODE', None): + from rucio.common.types import InternalScope + from rucio.core import replica + from rucio.core.rse import get_rse_vo + + +class RSEDeterministicScopeTranslation(PolicyPackageAlgorithms): + """ + Translates a pfn dictionary into a scope and name + """ + def __init__(self, vo: str = 'def'): + super().__init__() + self.register("def", RSEDeterministicScopeTranslation._default) + self.register("atlas", RSEDeterministicScopeTranslation._atlas) + policy_module = vo + # Uses the same policy as the DeterministicTranslation + if super()._supports(self.__class__.__name__, policy_module): + self.parser = self._get_one_algorithm(self.__class__.__name__, policy_module) + else: + self.parser = self._get_one_algorithm(self.__class__.__name__, "def") + + @classmethod + def register(cls, name: str, func: Callable) -> None: + super()._register(cls.__name__, {name: func}) + + @staticmethod + def _default(parsed_pfn: Mapping[str, str]) -> tuple[str, str]: + """ Translate pfn to name/scope pair + + :param parsed_pfn: dictionary representing pfn containing: + - path: str, + - name: str + :return: tuple containing name, scope + """ + path = parsed_pfn['path'] + scope = path.lstrip('/').split('/')[0] + name = parsed_pfn['name'] + return name, scope + + @staticmethod + def _atlas(parsed_pfn: Mapping[str, str]) -> tuple[str, str]: + """ Translate pfn to name/scope pair + + :param parsed_pfn: dictionary representing pfn containing: + - path: str, + - name: str + :return: tuple containing name, scope + """ + path = parsed_pfn['path'] + if path.startswith('/user') or path.startswith('/group'): + scope = '%s.%s' % (path.split('/')[1], path.split('/')[2]) + name = parsed_pfn['name'] + else: + name, scope = RSEDeterministicScopeTranslation._default(parsed_pfn) + + return name, scope + + +RSEDeterministicScopeTranslation() + + +RSEDeterministicTranslationT = TypeVar('RSEDeterministicTranslationT', bound='RSEDeterministicTranslation') + + +class RSEDeterministicTranslation(PolicyPackageAlgorithms): + """ + Execute the logic for translating a LFN to a path. + """ + + _DEFAULT_LFN2PFN = "hash" + _algorithm_type = "lfn2pfn" + + def __init__(self, rse=None, rse_attributes=None, protocol_attributes=None): + """ + Initialize a translator object from the RSE, its attributes, and the protocol-specific + attributes. + + :param rse: Name of RSE for this translation. + :param rse_attributes: A dictionary of RSE-specific attributes for use in the translation. + :param protocol_attributes: A dictionary of RSE/protocol-specific attributes. + """ + super().__init__() + self.rse = rse + self.rse_attributes = rse_attributes if rse_attributes else {} + self.protocol_attributes = protocol_attributes if protocol_attributes else {} + + @classmethod + def supports(cls, name): + """ + Check to see if a specific algorithm is supported. + + :param name: Name of the deterministic algorithm. + :returns: True if `name` is an algorithm supported by the translator class, False otherwise + """ + return super()._supports(cls._algorithm_type, name) + + @classmethod + def register(cls, lfn2pfn_callable, name=None): + """ + Provided a callable function, register it as one of the valid LFN2PFN algorithms. + + The callable will receive five arguments: + - scope: Scope of the LFN. + - name: LFN's path name + - rse: RSE name the translation is being done for. + - rse_attributes: Attributes of the RSE. + - protocol_attributes: Attributes of the RSE's protocol + The return value should be the last part of the PFN - it will be appended to the + rest of the URL. + + :param lfn2pfn_callable: Callable function to use for generating paths. + :param name: Algorithm name used for registration. If None, then `lfn2pfn_callable.__name__` is used. + """ + if name is None: + name = lfn2pfn_callable.__name__ + algorithm_dict = {name: lfn2pfn_callable} + super()._register(cls._algorithm_type, algorithm_dict) + + @staticmethod + def __hash(scope, name, rse, rse_attrs, protocol_attrs): + """ + Given a LFN, turn it into a sub-directory structure using a hash function. + + This takes the MD5 of the LFN and uses the first four characters as a subdirectory + name. + + :param scope: Scope of the LFN. + :param name: File name of the LFN. + :param rse: RSE for PFN (ignored) + :param rse_attrs: RSE attributes for PFN (ignored) + :param protocol_attrs: RSE protocol attributes for PFN (ignored) + :returns: Path for use in the PFN generation. + """ + del rse + del rse_attrs + del protocol_attrs + hstr = hashlib.md5(('%s:%s' % (scope, name)).encode('utf-8')).hexdigest() + if scope.startswith('user') or scope.startswith('group'): + scope = scope.replace('.', '/') + return '%s/%s/%s/%s' % (scope, hstr[0:2], hstr[2:4], name) + + @staticmethod + def __identity(scope, name, rse, rse_attrs, protocol_attrs): + """ + Given a LFN, convert it directly to a path using the mapping: + + scope:path -> scope/path + + :param scope: Scope of the LFN. + :param name: File name of the LFN. + :param rse: RSE for PFN (ignored) + :param rse_attrs: RSE attributes for PFN (ignored) + :param protocol_attrs: RSE protocol attributes for PFN (ignored) + :returns: Path for use in the PFN generation. + """ + del rse + del rse_attrs + del protocol_attrs + if scope.startswith('user') or scope.startswith('group'): + scope = scope.replace('.', '/') + return '%s/%s' % (scope, name) + + @staticmethod + def __belleii(scope, name, rse, rse_attrs, protocol_attrs): + """ + Given a LFN, convert it directly to a path using the mapping: + + path -> path + This is valid only for the belleii convention where the scope can be determined + from the LFN using a determinitic function. + + :param scope: Scope of the LFN. + :param name: File name of the LFN. + :param rse: RSE for PFN (ignored) + :param rse_attrs: RSE attributes for PFN (ignored) + :param protocol_attrs: RSE protocol attributes for PFN (ignored) + :returns: Path for use in the PFN generation. + """ + del scope + del rse + del rse_attrs + del protocol_attrs + return name + + @staticmethod + def __ligo(scope, name, rse, rse_attrs, protocol_attrs): + """ + Given a LFN, convert it directly to a path using the Caltech schema + + e.g.,: ER8:H-H1_HOFT_C02-1126256640-4096 -> + ER8/hoft_C02/H1/H-H1_HOFT_C02-11262/H-H1_HOFT_C02-1126256640-4096 + + :param scope: Scope of the LFN (observing run: ER8, O2, postO1, ...) + :param name: File name of the LFN (E.g., H-H1_HOFT_C02-1126256640-4096.gwf) + :param rse: RSE for PFN (ignored) + :param rse_attrs: RSE attributes for PFN (ignored) + :param protocol_attrs: RSE protocol attributes for PFN (ignored) + :returns: Path for use in the PFN generation. + """ + del rse + del rse_attrs + del protocol_attrs + from ligo_rucio import lfn2pfn as ligo_lfn2pfn # pylint: disable=import-error + return ligo_lfn2pfn.ligo_lab(scope, name, None, None, None) + + @staticmethod + def __xenon(scope, name, rse, rse_attrs, protocol_attrs): + """ + Given a LFN, turn it into a two level sub-directory structure based on the scope + plus a third level based on the name + :param scope: Scope of the LFN. + :param name: File name of the LFN. + :param rse: RSE for PFN (ignored) + :param rse_attrs: RSE attributes for PFN (ignored) + :param protocol_attrs: RSE protocol attributes for PFN (ignored) + :returns: Path for use in the PFN generation. + """ + del rse + del rse_attrs + del protocol_attrs + + return '%s/%s/%s/%s' % (scope[0:7], scope[4:len(scope)], name.split('-')[0] + "-" + name.split('-')[1], name) + + @staticmethod + def __lsst(scope, name, rse, rse_attrs, protocol_attrs): + """ + LFN2PFN algorithm for Rubin-LSST in the ESCAPE project + + Replace convention delimiter '__' by '/' + The Escape instance does use the 'generic' Rucio schema. + + :param scope: Scope of the LFN (ignored) + :param name: File name of the LFN. + :param rse: RSE for PFN (ignored) + :param rse_attrs: RSE attributes for PFN (ignored) + :param protocol_attrs: RSE protocol attributes for PFN (ignored) + :returns: Path for use in the PFN generation. + """ + del scope + del rse + del rse_attrs + del protocol_attrs + return name.replace('__', '/') + + @staticmethod + def __rubinbutler(scope, name, rse, rse_attrs, protocol_attrs): + del rse + del rse_attrs + del protocol_attrs + return '%s' % name + + + @classmethod + def _module_init_(cls): + """ + Initialize the class object on first module load. + """ + cls.register(cls.__hash, "hash") + cls.register(cls.__identity, "identity") + cls.register(cls.__ligo, "ligo") + cls.register(cls.__belleii, "belleii") + cls.register(cls.__xenon, "xenon") + policy_module = None + try: + policy_module = config.config_get('policy', 'lfn2pfn_module') + except (NoOptionError, NoSectionError): + pass + if policy_module: + # TODO: The import of importlib is done like this due to a dependency issue with python 2.6 and incompatibility of the module with py3.x + # More information https://github.com/rucio/rucio/issues/875 + import importlib + importlib.import_module(policy_module) + + cls._DEFAULT_LFN2PFN = config.get_lfn2pfn_algorithm_default() + + def path(self, scope, name): + """ Transforms the logical file name into a PFN's path. + + :param lfn: filename + :param scope: scope + + :returns: RSE specific URI of the physical file + """ + algorithm = self.rse_attributes.get(RseAttr.LFN2PFN_ALGORITHM, 'default') + if algorithm == 'default': + algorithm = RSEDeterministicTranslation._DEFAULT_LFN2PFN + algorithm_callable = super()._get_one_algorithm(RSEDeterministicTranslation._algorithm_type, algorithm) + return algorithm_callable(scope, name, self.rse, self.rse_attributes, self.protocol_attributes) + + +RSEDeterministicTranslation._module_init_() # pylint: disable=protected-access + + +class RSEProtocol: + """ This class is virtual and acts as a base to inherit new protocols from. It further provides some common functionality which applies for the amjority of the protocols.""" + + def __init__(self, protocol_attr, rse_settings, logger=logging.log): + """ Initializes the object with information about the referred RSE. + + :param protocol_attr: Properties of the requested protocol. + :param rse_settting: The RSE settings. + :param logger: Optional decorated logger that can be passed from the calling daemons or servers. + """ + self.auth_token = protocol_attr['auth_token'] + protocol_attr.pop('auth_token') + self.attributes = protocol_attr + self.translator = None + self.renaming = True + self.overwrite = False + self.rse = rse_settings + self.logger = logger + if self.rse['deterministic']: + self.translator = RSEDeterministicTranslation(self.rse['rse'], rse_settings, self.attributes) + if getattr(rsemanager, 'CLIENT_MODE', None) and \ + not RSEDeterministicTranslation.supports(self.rse.get('lfn2pfn_algorithm')): + # Remote server has an algorithm we don't understand; always make the server do the lookup. + setattr(self, 'lfns2pfns', self.__lfns2pfns_client) + else: + if getattr(rsemanager, 'CLIENT_MODE', None): + setattr(self, 'lfns2pfns', self.__lfns2pfns_client) + if getattr(rsemanager, 'SERVER_MODE', None): + setattr(self, '_get_path', self._get_path_nondeterministic_server) + + def lfns2pfns(self, lfns): + """ + Returns a fully qualified PFN for the file referred by path. + + :param path: The path to the file. + + :returns: Fully qualified PFN. + """ + pfns = {} + prefix = self.attributes['prefix'] + + if not prefix.startswith('/'): + prefix = ''.join(['/', prefix]) + if not prefix.endswith('/'): + prefix = ''.join([prefix, '/']) + + lfns = [lfns] if isinstance(lfns, dict) else lfns + for lfn in lfns: + scope, name = str(lfn['scope']), lfn['name'] + if 'path' in lfn and lfn['path'] is not None: + pfns['%s:%s' % (scope, name)] = ''.join([self.attributes['scheme'], + '://', + self.attributes['hostname'], + ':', + str(self.attributes['port']), + prefix, + lfn['path'] if not lfn['path'].startswith('/') else lfn['path'][1:] + ]) + else: + try: + pfns['%s:%s' % (scope, name)] = ''.join([self.attributes['scheme'], + '://', + self.attributes['hostname'], + ':', + str(self.attributes['port']), + prefix, + self._get_path(scope=scope, name=name) + ]) + except exception.ReplicaNotFound as e: + self.logger(logging.WARNING, str(e)) + return pfns + + def __lfns2pfns_client(self, lfns): + """ Provides the path of a replica for non-deterministic sites. Will be assigned to get path by the __init__ method if necessary. + + :param scope: list of DIDs + + :returns: dict with scope:name as keys and PFN as value (in case of errors the Rucio exception si assigned to the key) + """ + client = RSEClient() # pylint: disable=E0601 + + lfns = [lfns] if isinstance(lfns, dict) else lfns + lfn_query = ["%s:%s" % (lfn['scope'], lfn['name']) for lfn in lfns] + return client.lfns2pfns(self.rse['rse'], lfn_query, scheme=self.attributes['scheme']) + + def _get_path(self, scope, name): + """ Transforms the logical file name into a PFN. + Suitable for sites implementing the RUCIO naming convention. + This implementation is only invoked if the RSE is deterministic. + + :param scope: scope + :param name: filename + + :returns: RSE specific URI of the physical file + """ + return self.translator.path(scope, name) + + def _get_path_nondeterministic_server(self, scope, name): # pylint: disable=invalid-name + """ Provides the path of a replica for non-deterministic sites. Will be assigned to get path by the __init__ method if necessary. """ + vo = get_rse_vo(self.rse['id']) # pylint: disable=E0601 + scope = InternalScope(scope, vo=vo) # pylint: disable=E0601 + rep = replica.get_replica(scope=scope, name=name, rse_id=self.rse['id']) # pylint: disable=E0601 + if 'path' in rep and rep['path'] is not None: + path = rep['path'] + elif 'state' in rep and (rep['state'] is None or rep['state'] == 'UNAVAILABLE'): + raise exception.ReplicaUnAvailable('Missing path information and state is UNAVAILABLE for replica %s:%s on non-deterministic storage named %s' % (scope, name, self.rse['rse'])) + else: + raise exception.ReplicaNotFound('Missing path information for replica %s:%s on non-deterministic storage named %s' % (scope, name, self.rse['rse'])) + if path.startswith('/'): + path = path[1:] + if path.endswith('/'): + path = path[:-1] + return path + + def parse_pfns(self, pfns): + """ + Splits the given PFN into the parts known by the protocol. It is also checked if the provided protocol supports the given PFNs. + + :param pfns: a list of a fully qualified PFNs + + :returns: dic with PFN as key and a dict with path and name as value + + :raises RSEFileNameNotSupported: if the provided PFN doesn't match with the protocol settings + """ + ret = dict() + pfns = [pfns] if isinstance(pfns, str) else pfns + + for pfn in pfns: + parsed = urlparse(pfn) + scheme = parsed.scheme + hostname = parsed.netloc.partition(':')[0] + port = int(parsed.netloc.partition(':')[2]) if parsed.netloc.partition(':')[2] != '' else 0 + while '//' in parsed.path: + parsed = parsed._replace(path=parsed.path.replace('//', '/')) + path = parsed.path + prefix = self.attributes['prefix'] + while '//' in prefix: + prefix = prefix.replace('//', '/') + + # Protect against 'lazy' defined prefixes for RSEs in the repository + if not prefix.startswith('/'): + prefix = '/' + prefix + if not prefix.endswith('/'): + prefix += '/' + + if self.attributes['hostname'] != hostname: + if self.attributes['hostname'] != 'localhost': # In the database empty hostnames are replaced with localhost but for some URIs (e.g. file) a hostname is not included + raise exception.RSEFileNameNotSupported('Invalid hostname: provided \'%s\', expected \'%s\'' % (hostname, self.attributes['hostname'])) + + if self.attributes['port'] != port: + raise exception.RSEFileNameNotSupported('Invalid port: provided \'%s\', expected \'%s\'' % (port, self.attributes['port'])) + + if not path.startswith(prefix): + raise exception.RSEFileNameNotSupported('Invalid prefix: provided \'%s\', expected \'%s\'' % ('/'.join(path.split('/')[0:len(prefix.split('/')) - 1]), + prefix)) # len(...)-1 due to the leading '/ + + # Splitting parsed.path into prefix, path, filename + path = path.partition(prefix)[2] + name = path.split('/')[-1] + path = '/'.join(path.split('/')[:-1]) + if not path.startswith('/'): + path = '/' + path + if path != '/' and not path.endswith('/'): + path = path + '/' + ret[pfn] = {'path': path, 'name': name, 'scheme': scheme, 'prefix': prefix, 'port': port, 'hostname': hostname, } + + return ret + + def exists(self, path): + """ + Checks if the requested file is known by the referred RSE. + + :param path: Physical file name + + :returns: True if the file exists, False if it doesn't + + :raises SourceNotFound: if the source file was not found on the referred storage. + """ + raise NotImplementedError + + def connect(self): + """ + Establishes the actual connection to the referred RSE. + + :raises RSEAccessDenied: if no connection could be established. + """ + raise NotImplementedError + + def close(self): + """ Closes the connection to RSE.""" + raise NotImplementedError + + def get(self, path, dest, transfer_timeout=None): + """ + Provides access to files stored inside connected the RSE. + + :param path: Physical file name of requested file + :param dest: Name and path of the files when stored at the client + :param transfer_timeout: Transfer timeout (in seconds) + + :raises DestinationNotAccessible: if the destination storage was not accessible. + :raises ServiceUnavailable: if some generic error occurred in the library. + :raises SourceNotFound: if the source file was not found on the referred storage. + """ + raise NotImplementedError + + def put(self, source, target, source_dir, transfer_timeout=None): + """ + Allows to store files inside the referred RSE. + + :param source: path to the source file on the client file system + :param target: path to the destination file on the storage + :param source_dir: Path where the to be transferred files are stored in the local file system + :param transfer_timeout: Transfer timeout (in seconds) + + :raises DestinationNotAccessible: if the destination storage was not accessible. + :raises ServiceUnavailable: if some generic error occurred in the library. + :raises SourceNotFound: if the source file was not found on the referred storage. + """ + raise NotImplementedError + + def delete(self, path): + """ + Deletes a file from the connected RSE. + + :param path: path to the to be deleted file + + :raises ServiceUnavailable: if some generic error occurred in the library. + :raises SourceNotFound: if the source file was not found on the referred storage. + """ + raise NotImplementedError + + def rename(self, path, new_path): + """ Allows to rename a file stored inside the connected RSE. + + :param path: path to the current file on the storage + :param new_path: path to the new file on the storage + + :raises DestinationNotAccessible: if the destination storage was not accessible. + :raises ServiceUnavailable: if some generic error occurred in the library. + :raises SourceNotFound: if the source file was not found on the referred storage. + """ + raise NotImplementedError + + def get_space_usage(self): + """ + Get RSE space usage information. + + :returns: a list with dict containing 'totalsize' and 'unusedsize' + + :raises ServiceUnavailable: if some generic error occurred in the library. + """ + raise NotImplementedError + + def stat(self, path): + """ + Returns the stats of a file. + + :param path: path to file + + :raises ServiceUnavailable: if some generic error occurred in the library. + :raises SourceNotFound: if the source file was not found on the referred storage. + + :returns: a dict with two keys, filesize and adler32 of the file provided in path. + """ + raise NotImplementedError diff --git a/tests/ingest.yaml b/tests/ingest.yaml new file mode 100644 index 0000000..d94ff3b --- /dev/null +++ b/tests/ingest.yaml @@ -0,0 +1,14 @@ +brokers: localhost:9092 + +group_id: "my_test_group" + +rses: + XRD1: + rucio_prefix: root://xrd1:1094//rucio/test + fs_prefix: file:///tmp + XRD2: + rucio_prefix: root://xrd2:1095//rucio/test + fs_prefix: file:///tmp + XRD3: + rucio_prefix: root://xrd3:1096//rucio/test + fs_prefix: file:///tmp diff --git a/tests/rse_mapper.py b/tests/rse_mapper.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test.py b/tests/test.py new file mode 100644 index 0000000..2e126f9 --- /dev/null +++ b/tests/test.py @@ -0,0 +1,27 @@ +from confluent_kafka import Consumer +import socket +from typing import Dict +import yaml +from lsst.ctrl.rucio.ingest.rse_mapper import RseMapper +from lsst.ctrl.rucio.ingest.message import RSE_KEY, URL_KEY, Message + +rse_map = RseMapper("ingest.yaml") +group_id = rse_map.group_id() +brokers = rse_map.brokers() +topics = rse_map.topics() + +conf = { 'bootstrap.servers': 'localhost:9092', 'client.id': socket.gethostname, 'group.id': group_id, 'auto.offset.reset': 'earliest', 'enable.auto.commit': False } + +consumer = Consumer(conf) +consumer.subscribe(topics) + +msgs = consumer.consume(num_messages=1) + +for msg in msgs: + # print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value)) + message = Message(msg) + rse, url = message.extract_rse_info() + s = rse_map.resplice( rse, url) + print("url: %s, new url: %s" % (url, s)) + print() + diff --git a/ups/ctrl_rucio_ingest.cfg b/ups/ctrl_rucio_ingest.cfg new file mode 100644 index 0000000..4c04dfa --- /dev/null +++ b/ups/ctrl_rucio_ingest.cfg @@ -0,0 +1,20 @@ +# -*- python -*- + +import lsst.sconsUtils + +# Dependencies that provide header files and or libraries should be included here. +# Pure-Python dependencies do not need to be included. +# Packages that use pybind11 or boost_tests should declare them as build dependencies. +# Otherwise, the rules for which packages to list here are the same as those for +# table files. +dependencies = { +} + +# For packages that build a C++ library and a Python module, the below should be sufficient. +# Pure-Python packages should set headers=[], libs=[] (not libs=None). and hasSwigFiles=False. +# For more information, see the sconsUtils Doxygen documentation. +config = lsst.sconsUtils.Configuration( + __file__, + headers=[], + hasDoxygenInclude=False, +) diff --git a/ups/ctrl_rucio_ingest.table b/ups/ctrl_rucio_ingest.table new file mode 100644 index 0000000..871b8e5 --- /dev/null +++ b/ups/ctrl_rucio_ingest.table @@ -0,0 +1,21 @@ +# List EUPS dependencies of this package here +# - Common third-party packages (boost, python, doxygen) and low-level +# LSST packages can be assumed to be recursively included by +# LSST packages such as utils or daf_base. +# - Any package whose API is used should be listed explicitly +# rather than assuming it will be included recursively. +# - The base package provides lsstimport. You can remove this explicit +# dependency if this package also declares higher-level dependencies. +# - The utils package supports unit tests. This dependency can be removed +# if the package doesn't directly use its APIs in tests. +setupRequired(base) +setupRequired(utils) +setupRequired(sconsUtils) +setupRequired(daf_butler) +setupRequired(obs_lsst) +setupRequired(log) + +# The following is boilerplate for all packages. +# See https://dmtn-001.lsst.io for details on LSST_LIBRARY_PATH. +envPrepend(PYTHONPATH, ${PRODUCT_DIR}/python) +envPrepend(PATH, ${PRODUCT_DIR}/bin)